17. Канали

17. Канали

17. Канали

6 декември 2017

Административни неща

Преговор

Нишки

Разлика между thread::spawn и thread::Builder::new().spawn()

use std::thread;

let handle = thread::spawn(|| println!("изненада!")); // JoinHandle<()>

let probably_handle = thread::Builder::new()
    .name(String::from("safe"))
    .spawn(|| println!("сюрприз!")); // Result<JoinHandle<()>>

handle.join();

probably_handle.unwrap().join();
// match probably_handle { ... }

Преговор

Нишки

Какво може да правим с JoinHandle?

let handle = thread::spawn(|| 2_i32 + 2); // JoinHandle<i32>

match handle.join() {
    Ok(value) => println!("{}", value),
    Err(e) => panic!("Изчислението беше ТВЪРДЕ СЛОЖНО! {:?}", e)
}

Преговор

Споделена памет със Arc, Mutex, move closures

let counter = Arc::new(Mutex::new(0_u32)); // Arc<Mutex<u32>>
let mut handles = Vec::new();

for _ in 0..10 {
    let counter = Arc::clone(&counter); // Arc<Mutex<u32>>
    let handle = thread::spawn(move || {
      let mut num = counter.
          lock().     // LockResult<MutexGuard<T>>
          unwrap();   // MutexGuard<T>

        // *MutexGuard<T> -> *&T
        *num += 1;
    });
    handles.push(handle);
}

Преговор

Send и Sync

Atomics

Barrier

use std::sync::{Arc, Barrier};
use std::thread;

let mut handles = Vec::with_capacity(10);

// създаваме бариера, която ще блокира първите 9 нишки, които извикат .wait()
// и ще събуди всички когато 10-тата извика .wait()
let barrier = Arc::new(Barrier::new(10));

Barrier

for _ in 0..10 {
    let c = Arc::clone(&barrier);

    // Еднаквите съобщения ще се принтират заедно
    handles.push(thread::spawn(move|| {
        println!("before wait");
        c.wait();
        println!("after wait");
    }));
}

for handle in handles {
    handle.join().unwrap();
}

Condvar

fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = Arc::clone(&pair);

    thread::spawn(move || {
        let &(ref mutex, ref cvar) = &*pair2;
        let mut started = mutex.lock().unwrap();

        *started = true;

        // Казваме на `cvar` че сме променили стойността.
        // Ако има чакаща нишка с `cvar.wait()` тя ще бъде събудена
        // Ако няма нищо няма да се случи - извиквания на `notify_one` или `notify_all` не се бъферират
        cvar.notify_one();
    });

    // ...

Condvar

fn main() {
    // ...

    let &(ref mutex, ref cvar) = &*pair;
    let mut started = mutex.lock().unwrap();

    // Изчакваме новата нишка да започне
    // Възможно е тази нишката да бъде събудена спонтанно, без да е извикано `cvar.notify_*()`,
    // затова проверяваме дали е изпълнено условието в цикъл
    while !*started {
        // `cvar.wait()` прави следното
        // - отключва мутекса `started`
        // - слага нишката да спи
        // - когато нишката се събуди, заключва мутекса и го връща
        started = cvar.wait(started).unwrap();
    }
}

RwLock

let lock = RwLock::new(5);

// many reader locks can be held at once
{
    let r1 = lock.read().unwrap();
    let r2 = lock.read().unwrap();
    assert_eq!(*r1, 5);
    assert_eq!(*r2, 5);
} // read locks are dropped at this point

// only one write lock may be held, however
{
    let mut w = lock.write().unwrap();
    *w += 1;
    assert_eq!(*w, 6);
} // write lock is dropped here

RwLock

RwLock

приоритет

RwLock

приоритет

Channels

Multiple Producer, Single Consumer

Channels

Don't communicate by sharing memory,

share memory by communicating

-- go-lang

Канали

use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());  // 10
}

Типове канали

Типове канали

let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);

Типове канали

Типове канали

let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);

Множество изпращачи

let (sender, receiver) = mpsc::channel();
let shared_sender = Arc::new(sender);

thread::spawn(move || {
    shared_sender.send(1).unwrap();
    shared_sender.send(2).unwrap();
});

thread::spawn(move || {
    shared_sender.send(3).unwrap();
    shared_sender.send(4).unwrap();
});

println!("{} {} {} {}", receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());

Множество изпращачи

error[E0277]: the trait bound `std::sync::mpsc::Sender: std::marker::Sync` is not satisfied
--> src/main.rs:9:5
  |
9 |     thread::spawn(move || {
  |     ^^^^^^^^^^^^^ `std::sync::mpsc::Sender` cannot be shared between threads safely
  | 

Множество изпращачи

let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}", receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());

// примерен изход: 3 4 1 2

Sender

методи

fn send(&self, t: T) -> Result<(), SendError<T>>

Sender

методи

let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));

SyncSender

методи

fn send(&self, t: T) -> Result<(), SendError<T>>
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>

SyncSender

методи

let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));

Множество получатели

Ако много искаме можем да си развием въображението

Receiver

методи

// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>

// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>

// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>

Receiver

методи

let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

sender.send(12).unwrap();
sender2.send(23).unwrap();
mem::drop(sender);

assert_eq!(receiver.try_recv(), Ok(12));
assert_eq!(receiver.try_recv(), Ok(23));
assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));

mem::drop(sender2);

assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));

Receiver

методи

let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}

Receiver

итератори

for msg in receiver.iter() {
    // ...
}

for msg in receiver.try_iter() {
    // ...
}
while let Ok(msg) = receiver.recv() {
    // ...
}

while let Ok(msg) = receiver.try_recv() {
    // ...
}

Networking

Стандартната библиотека имплементира networking примитиви в модула std::net

UDP

UdpSocket

use std::net::UdpSocket;

// сокета се затваря на края на scope-a
{
    let mut socket = UdpSocket::bind("127.0.0.1:34254")?;

    // Получава една дейтаграма от сокета. Ако буфера е прекалено малък за съобщението,
    // то ще бъде орязано.
    let mut buf = [0; 10];
    let (amt, src) = socket.recv_from(&mut buf)?;

    // Редекларира `buf` като слайс от получените данни и ги праща в обратен ред.
    let buf = &mut buf[..amt];
    buf.reverse();
    socket.send_to(buf, &src)?;
}

TCP

TcpStream

use std::io::prelude::*;
use std::net::TcpStream;

// стриймът се затваря на края на scope-a
{
    let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();

    let _ = stream.write(&[1]);
    let _ = stream.read(&mut [0; 128]);
}

TCP

TcpListener

use std::net::{TcpListener, TcpStream};

fn handle_client(stream: TcpStream) {
    // ...
}

let listener = TcpListener::bind("127.0.0.1:80").unwrap();

// примера конекции и ги обработва
for stream in listener.incoming() {
    handle_client(stream?);
}

TCP

Simple chat

Ще разгледаме проста чат система за демонстрация на нишки, канали и TCP

Пълния код може да се разгледа в Github

TCP

Simple chat

Какво няма да обхванем:

Simple chat

Server

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let server = TcpListener::bind(LOCALHOST).expect("Listener failed to bind");
    server.set_nonblocking(true).expect("Failed to initialize nonblocking");

    // Stores client sockets
    let mut clients = Vec::new();
    let (sx, rx) = mpsc::channel::<String>();

    loop {
        /* accept */
        /* broadcast */
        thread::sleep(Duration::from_millis(100));
    }
}

Simple chat

Server

Използваме nonblocking sockets, за да може да не правим само accept в главната нишка

Server

Accepting clients

// Try to accept a client
if let Ok((mut socket, addr)) = server.accept() {
    println!("Client {} connected", addr);

    clients.push(socket.try_clone().expect("Failed to clone client"));

    let sx = sx.clone();
    thread::spawn(move || loop {
        /* try recv */
        thread::sleep(Duration::from_millis(100));
    });
}

/* message broadcast */

Server

Message broadcast

if let Ok(msg) = rx.try_recv() {
    // Try to send message from master channel
    clients = clients.into_iter().filter_map(|mut client| {
        let mut buf = msg.clone().into_bytes();
        buf.resize(MESSAGE_SIZE, 0);

        client.write_all(&buf).map(|_| client).ok()
    }).collect::<Vec<_>>();
}

Server

Receiver loop

let mut buf = vec![0; MESSAGE_SIZE];

// Try to receive message from client
match socket.read_exact(&mut buf) {
    Ok(_) => {
        let msg = buf.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
        let msg = String::from_utf8(msg).expect("Invalid utf8 message");

        println!("{}: {:?}", addr, msg);
        sx.send(msg).expect("Send to master channel failed");
    },
    Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
    Err(_) => {
        println!("Closing connection with: {}", addr);
        break;
    }
}

Simple chat

Client

const LOCALHOST: &str = "127.0.0.1:1234";
const MESSAGE_SIZE: usize = 32;

fn main() {
    let mut client = TcpStream::connect(LOCALHOST).expect("Stream failed to connect");
    client.set_nonblocking(true).expect("Failed to initialize nonblocking");

    let (sx, rx) = mpsc::channel::<String>();

    thread::spawn(move || loop {
        /* try recv */
        /* try send */
        sleep();
    });

    /* repl */
}

Client

REPL

println!("Welcome!");
loop {
    let mut buf = String::new();
    io::stdin().read_line(&mut buf).expect("Reading form stdin failed");
    let msg = buf.trim().to_string();

    if msg == ":q" || sx.send(msg).is_err() { break }
}
println!("Bye!");

Client

receive message

Прилича много на кода в server-а

let mut buf = vec![0; MESSAGE_SIZE];

// Try to receive message from server
match client.read_exact(&mut buf) {
    Ok(_) => {
        let msg = buf.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
        let msg = String::from_utf8(msg).expect("Invalid utf8 message");
        println!("message recv {:?}", msg);
    },
    Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
    Err(_) => {
        println!("Connection with the server closed");
        break;
    }
}

Client

send message

let mut buf = vec![0; MESSAGE_SIZE];

// Try to send message
match rx.try_recv() {
    Ok(msg) => {
        let mut buf = msg.clone().into_bytes();
        buf.resize(MESSAGE_SIZE, 0);
        client.write_all(&buf).expect("Writing to socket failed");
        println!("message sent {:?}", msg);
    },
    Err(TryRecvError::Empty) => (),
    Err(TryRecvError::Disconnected) => break
}

Въпроси