16. Нишки

16. Нишки

16. Нишки

1 декември 2017

Административни неща

Преговор

Преговор

Миналият път малко ви излъгахме за closures

Как се прихващат променливите зависи от това как се използват

let nums = vec![0, 1, 2, 3];

// прихваща `nums` като `&Vec<i32>`
let f1 = || {
    for n in &nums {
        println!("{}", n);
    }
};

// прихваща `nums` като `Vec<i32>`
let f2 = || {
    for n in nums {
        println!("{}", n);
    }
};

Преговор

move премества стойността, независимо как се използва

let nums = vec![0, 1, 2, 3];

// прихваща `nums` като `Vec<i32>`
let f3 = move || {
    for n in &nums {
        println!("{}", n);
    }
};

Преговор

Няколко трика ако искаме да преместим някоя стойност, но да прихванем друга по референция

let nums = vec![0, 1, 2, 3];
let s = String::from("баба");

let f = || {
    let nums = nums;        // move `nums`

    println!("{:?}", nums);
    println!("{:?}", s);
};

// println!("{:?}", nums);   // комп. грешка
println!("{:?}", s);

Преговор

let nums = vec![0, 1, 2, 3];
let s = String::from("баба");

{
    let s = &s;             // move `s: &String`

    let f = move || {
        println!("{:?}", nums);
        println!("{:?}", s);
    };
}

// println!("{:?}", nums);   // комп. грешка
println!("{:?}", s);

Event emitter

Миналият път написахме прост event emitter

Да си припомним какъв интерфейс имаше

Event emitter

struct EventEmitter<E, P> where E: Eq + Hash, P: Clone {
    next_id: Id,
    map: HashMap<E, Vec<Listener<P>>>
}

impl<E, P> EventEmitter<E, P> where E: Eq + Hash, P: Clone {
    fn new() -> Self { ... }

    /// Регистрира слушател
    fn on<F>(&mut self, event: E, listener: F) -> Id where F: Fn(P) + 'static { ... }

    /// Премахва слушател
    fn off(&mut self, id: Id) -> bool { ... }

    /// Излъчва съобщение с данни
    fn emit<B>(&self, event: B, payload: P) -> bool where B: Borrow<E> { ... }
}

Event emitter

type Id = u64;

struct Listener<P> {
    id: Id,
    closure: Box<Fn(P) + 'static>
}

impl<P> Listener<P> {
    fn new<F>(id: Id, f: F) -> Self where F: Fn(P) + 'static { ... }
}

Event emitter

Тази имплементация имаше проблем - какво става ако искаме да подадем данните за съобщението през референция

fn main() {
    let mut emitter = EventEmitter::new();
    emitter.on("boot", |p: &str| println!("{}", p));

    let data = "woot".to_string();
    emitter.emit("boot", &data);
}

Event emitter

error[E0597]: `data` does not live long enough
  --> src/main.rs:177:1
    |
176 |     emitter.emit("boot", &data);
    |                           ---- borrow occurs here
177 | }
    | ^ `data` dropped here while still borrowed
    |
    = note: values in a scope are dropped in the opposite order they are created 

Защо?

Event emitter

Още по-странно, ако преместим data над emitter, всичко работи

fn main() {
    let data = "woot".to_string();

    let mut emitter = EventEmitter::new();
    emitter.on("boot", |p: &str| println!("{}", p));

    emitter.emit("boot", &data);
}

Event emitter

emitter няма lifetime, но иска да живее повече от data...

... а дали е така?

Event emitter

emitter.emit("boot", &data);

fn emit<B>(&self, event: B, payload: P) -> bool where B: Borrow<E> { ... }

// => P = &'a str

Event emitter

fn main() {
    let mut emitter = EventEmitter::new();            // EventEmitter<E = ?, P = ?>
    emitter.on("boot", |p: &str| println!("{}", p));  // EventEmitter<&'static str, &'? str>

    let data = "woot".to_string();
    emitter.emit("boot", &data);                      // EventEmitter<&'static str, &'a str>
}

P ограничава колко може да живее emitter

Event emitter

Решението: P -> &P

fn emit<B>(&self, event: B, payload: &P) -> bool where B: Borrow<E> { ... }

struct Listener<P> {
    id: Id,
    closure: Box<Fn(&P) + 'static>
}

Event emitter

error[E0277]: the trait bound `str: std::marker::Sized` is not satisfied
  --> src/main.rs:175:13
    |
175 |     emitter.on("boot", |p: &str| println!("{}", p));
    |             ^^ `str` does not have a constant size known at compile-time
    |
    = help: the trait `std::marker::Sized` is not implemented for `str` 

Event emitter

Използваме P само зад референция (&P) затова можем да добавим "ограничение" ?Sized

struct Listener<P> where P: ?Sized {
    id: Id,
    closure: Box<Fn(&P) + 'static>
}

struct EventEmitter<E, P> where E: Eq + Hash, P: ?Sized {
    next_id: Id,
    map: HashMap<E, Vec<Listener<P>>>
}

Event emitter

woot 

Работи!

Closures

Аргументи с lifetime

Closures

Аргументи с lifetime

Fearless concurrency

Нишки

use std::thread;

fn main() {
    thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
}

Примерен изход

hi from the main thread 

Нишки

Програмата приключва когато главната нишка завърши

use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
    handle.join();
}
hi from the main thread
hi from the spawned thread 

Нишки

use std::thread;

fn main() {
    let nums = (0..100).collect::<Vec<_>>();

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {} from the spawned thread", i);
        }
    });

    handle.join();
}

Нишки

error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned
              by the current function
--> src/main.rs:6:36
  |
6 |         let handle = thread::spawn(|| {
  |                                    ^^ may outlive borrowed value `nums`
7 |             for i in &nums {
  |                       ---- `nums` is borrowed here
  |
help: to force the closure to take ownership of `nums` (and any other referenced variables),
      use the `move` keyword
  |
6 |         let handle = thread::spawn(move || {
  |                                    ^^^^^^^ 

Нишки

pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,

Нишки

use std::thread::Builder;

let nums = (0..100).collect::<Vec<_>>();

let handle = Builder::new()
    .name("sirespawn".to_string())
    .spawn(move || {
        for i in &nums {
            println!("number {} from the spawned thread", i);
        }
        nums
    })
    .expect("could not create thread");

let nums = handle.join().expect("sirespawn panicked!");
println!("{:?}", nums);

Нишки

use std::thread;
use std::time::Duration;

let handle = thread::spawn(|| {
    println!("before park");
    thread::park();
    println!("after park");
});

thread::sleep(Duration::from_secs(1));
handle.thread().unpark();
let _ = handle.join();

Send + Sync

Send - позволява прехвърляне на собственост между нишки

Sync - позволява споделяне между няколко нишки през референция &T

Send + Sync

Send

Send + Sync

Sync

Send + Sync

имплементация

struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}

Send + Sync

деимплементация

// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send + Sync

деимплементация

Хак ако не използваме nightly rust

use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);

Arc

Arc

use std::sync::Arc;
use std::thread;

fn main() {
    let shared = Arc::new(123);

    for _ in 0..3 {
        let shared = Arc::clone(&shared);

        thread::spawn(move || {
            let data = *shared;
            println!("{}", data)
        });
    }
}

Mutex

use std::sync::Mutex;

fn main() {
    let mutex = Mutex::new(10);

    {
        // lock the mutex
        // `lock` is a smart pointer which derefs to `&T` and `&mut T`
        let mut lock = mutex.lock().unwrap();

        *lock += 32;

        // mutex is unlocked when `lock` is dropped (`lock` is a RAII guard)
    }
}

Други

Event emitter

За упражнение ще разширим event emitter-а от миналата лекция така, че да може да се използва от много нишки

impl<E, P> EventEmitter<E, P> where E: Eq + Hash {
    fn new() -> Arc<Mutex<Self>> {
        Arc::new(Mutex::new(Self {
            next_id: Id::default(),
            map: HashMap::new()
        }))
    }
}

panic

panic

Atomics

Съществуват и атомарни стандартни типове

Atomics

use std::sync::atomic::{AtomicIsize, Ordering};

let some_isize = AtomicIsize::new(5);

some_isize.store(10, Ordering::Relaxed);
assert_eq!(some_isize.load(Ordering::Relaxed), 10);

Atomics

Какво прави Ordering?

pub enum Ordering {
  Relaxed,
  SeqCst,
  // some variants omitted
}

`SeqCst` (sequentially consistent) не позволява нито инструкции за четене нито за писане да бъдат размествани нито преди, нито след реда с нашата инструкция.

Atomics

Има и разни полезни методи като compare_and_swap, но оставяме на вас да си четете документацията, ако ще ги ползвате.

Еvent emitter

От миналата лекция видяхме как може да си направим прост EventEmitter

Async event emitter

В същия дух ще направим един който да вика слушателите асинхронно

let mut emitter = AsyncEventEmitter::<&str, u32>::new();

emitter.on("boot", |p| println!("{}", p));
emitter.on("woot", |p| println!("{}", p));

emitter.emit("boot", 1).unwrap();
emitter.emit("woot", 2).unwrap();
emitter.emit("boot", 3).unwrap();

Async event emitter

Нека да пробваме най-базовото нещо:

use std::thread;

fn emit<B>(&self, event: B, payload: P) -> bool where B: Borrow<E> {
    let event = event.borrow();

    match self.map.get(event) {
        Some(listeners) => {
            thread::spawn(|| {
                listeners.iter().for_each(|f| (f.closure)(payload.clone()));
            });
            true
        },
        None => false
    }
}

Async event emitter

error[E0277]: the trait bound `P: std::marker::Sync` is not satisfied
  --> src\main.rs:166:17
    |
166 |     thread::spawn(|| {
    |     ^^^^^^^^^^^^^ `P` cannot be shared between threads safely
    |

error[E0277]: the trait bound `std::ops::Fn(P) + 'static: std::marker::Sync` is not satisfied
  --> src\main.rs:166:17
    |
166 |     thread::spawn(|| {
    |     ^^^^^^^^^^^^^ `std::ops::Fn(P) + 'static` cannot be shared between threads safely
    | 

Async event emitter

А какво, ако послушаме компилатора?

Async event emitter

error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements
  --> src\main.rs:164:24
    |
164 |         match self.map.get(event) {
    |                        ^^^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 159:5...
  --> src\main.rs:159:5
    |
159 | /     fn emit<B>(&self, event: B, payload: P) -> bool where B: Borrow {
160 | |         use std::thread;
161 | |
162 | |         let event = event.borrow();
...   |
172 | |         }
173 | |     }
    | |_____^
note: ...so that reference does not outlive borrowed content
  --> src\main.rs:164:15
    |
164 |         match self.map.get(event) {
    |               ^^^^^^^^
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src\main.rs:166:31: 168:18 listeners:&&std::vec::Vec>, payload:&P]` will meet its required lifetime bounds
  --> src\main.rs:166:17
    |
166 |                 thread::spawn(|| {
    |                 ^^^^^^^^^^^^^ 

Async event emitter

Май ще си покопаме..

Async event emitter

Може би да пробваме с Arc и Mutex

use std::sync::{Arc, Mutex};

struct EventEmitter<E, P> where E: Eq + Hash, P: Clone {
    next_id: Id,
    map: HashMap<E, Arc<Mutex<Vec<Listener<P>>>>>
}

Async event emitter

Тогава трябва да минем и заместим listeners променливите с това

let listeners = arc.lock().expect("Something went wrong")

Async event emitter

За on и off е тривиално, ами emit?

fn emit<B>(&self, event: B, payload: P) -> bool where B: Borrow<E> {
    let event = event.borrow();

    match self.map.get(event) {
        Some(arc) => {
            thread::spawn(|| {
                let listeners = arc.lock().expect("Something went wrong");
                listeners.iter().for_each(|f| (f.closure)(payload.clone()));
            });
            true
        },
        None => false
    }
}

Async event emitter

Май имаме напредък?

error[E0277]: the trait bound `P: std::marker::Send` is not satisfied in
`[closure@src\main.rs:178:31: 181:18 arc:&std::sync::Arc>>>, payload:P]`
  --> src\main.rs:178:17
    |
178 |     thread::spawn(|| {
    |     ^^^^^^^^^^^^^ `P` cannot be sent between threads safely
    |
    required by `std::thread::spawn`

error[E0277]: the trait bound `std::ops::Fn(P) + 'static: std::marker::Send` is not satisfied
  --> src\main.rs:178:17
    |
178 |     thread::spawn(|| {
    |     ^^^^^^^^^^^^^ `std::ops::Fn(P) + 'static` cannot be sent between threads safely
    |
    required by `Arc` 

Async event emitter

Нека пробваме пак, но този път компилатора иска Send

Async event emitter

Офф пак това..

error[E0277]: the trait bound `P: std::marker::Sync` is not satisfied
  --> src\main.rs:178:17
    |
178 |     thread::spawn(|| {
    |     ^^^^^^^^^^^^^ `P` cannot be shared between threads safely
    | 

Async event emitter

closure-а се опитва да прихване променливите по референция

thread::spawn(|| {
    /* ... */
});

Async event emitter

На нас не ни трябва това, move е достатъчно добре

fn emit<B>(&self, event: B, payload: P) -> bool where B: Borrow<E> {
    let event = event.borrow();

    match self.map.get(event) {
        Some(arc) => {
            thread::spawn(move || {
                let listeners = arc.lock().expect("Something went wrong");
                listeners.iter().for_each(|f| (f.closure)(payload.clone()));
            });
            true
        },
        None => false
    }
}

Async event emitter

error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements
  --> src\main.rs:176:24
    |
176 |         match self.map.get(event) {
    |                        ^^^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 165:5...
  --> src\main.rs:165:5
    |
165 | /     fn emit<B>(&self, event: B, payload: P) -> bool where B: Borrow {
166 | |         let event = event.borrow();
...   |
185 | |         }
186 | |     }
    | |_____^
note: ...so that reference does not outlive borrowed content
  --> src\main.rs:176:15
    |
176 |         match self.map.get(event) {
    |               ^^^^^^^^
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src\main.rs:178:31: 181:18 arc:&std::sync::Arc>>>, payload:P]` will meet its required lifetime bounds
  --> src\main.rs:178:17
    |
178 |                 thread::spawn(move || {
    | 

Async event emitter

Изглежда изродско, но просто иска да ни каже, че референцията дето ни дава map.get не може да я премести тъй като вероятно ще надживее main нишката.

Async event emitter

Решението е просто да клонираме Arc

match self.map.get(event) {
    Some(arc) => {
        let arc = arc.clone();
        thread::spawn(move || {
            let listeners = arc.lock().expect("Something went wrong");
            listeners.iter().for_each(|f| (f.closure)(payload.clone()));
        });
        true
    },
    None => false
}

Async event emitter

👏

Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target\debug\async_event_emitter.exe` 

Async event emitter

Супер, нека видим как ще го ползваме това

fn main() {
    let mut emitter = AsyncEventEmitter::new();

    emitter.on("boot", |p: String| println!("{}", p));
    let id = emitter.on("boot", |_| println!("hi"));

    emitter.off(id);

    emitter.emit("boot", "woot".to_string());
}
woot 

Async event emitter

Изглежда добре :)

Async event emitter

Или пък не..

fn main() {
    let mut emitter = AsyncEventEmitter::<&str, u32>::new();

    emitter.on("boot", |p| println!("{}", p));
    emitter.on("woot", |p| println!("{}", p));

    emitter.emit("boot", 1).unwrap();
    emitter.emit("woot", 2).unwrap();
    emitter.emit("boot", 3).unwrap();
}
1 

Async event emitter

Пак?

1
2 

Async event emitter

Изглежда главната ни нишка приключва рано

Async event emitter

Може да преправим emit метода да ни връща JoinHandle

fn emit<B>(&self, event: B, payload: P) -> Option<JoinHandle<()>> where B: Borrow<E> {
    let event = event.borrow();

    self.map.get(event).map(|arc| {
        let arc = arc.clone();
        thread::spawn(move || {
            let listeners = arc.lock().expect("Something went wrong");
            listeners.iter().for_each(|f| (f.closure)(payload.clone()));
        })
    })
}

Async event emitter

Така може да постигнем нещо подобно на

fn main() {
    let mut emitter = AsyncEventEmitter::<&str, u32>::new();
    emitter.on("boot", |p| println!("{}", p));
    emitter.on("woot", |p| println!("{}", p));

    let mut handles = Vec::new();

    for _ in 0..2 {
        handles.push(emitter.emit("boot", 1).unwrap());
        handles.push(emitter.emit("woot", 2).unwrap());
        handles.push(emitter.emit("boot", 3).unwrap());
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Async event emitter

1
3
2
1
3
2 

Въпроси