Pin

Future์— ๋Œ€ํ•ด await๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๊ทธ future๊ฐ€ ์ค€๋น„๋˜๊ธฐ๋ฅผ ๊ธฐ๋‹ค๋ฆด ๋•Œ, ๋ชจ๋“  ๋กœ์ปฌ ๋ณ€์ˆ˜(์ผ๋ฐ˜์ ์œผ๋กœ ์Šคํƒ ํ”„๋ ˆ์ž„์— ์ €์žฅ๋จ)๋Š” ๊ทธ future๊ฐ์ฒด ์•ˆ์— ์ €์žฅ๋ฉ๋‹ˆ๋‹ค. ๋งŒ์•ฝ ๊ทธ future์— ์Šคํƒ์— ์žˆ๋Š” ์–ด๋–ค ๋ฐ์ดํ„ฐ๋กœ์˜ ํฌ์ธํ„ฐ๊ฐ€ ์žˆ์œผ๋ฉด ์ด๋Ÿฌํ•œ ํฌ์ธํ„ฐ๋Š” ์˜ฌ๋ฐ”๋ฅด์ง€ ์•Š์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์•ˆ์ „ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

๋”ฐ๋ผ์„œ future๊ฐ€ ๊ฐ€๋ฆฌํ‚ค๋Š” ์ฃผ์†Œ๊ฐ€ ๋ณ€๊ฒฝ๋˜์ง€ ์•Š๋„๋ก ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ด๊ฒƒ์ด future๋ฅผ pin(๊ณ ์ •)ํ•ด์•ผ ํ•˜๋Š” ์ด์œ ์ž…๋‹ˆ๋‹ค. select!์—์„œ ๋™์ผํ•œ future๋ฅผ ๋ฐ˜๋ณต์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๋ฉด ๊ณ ์ • ๊ฐ’ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋งŽ์Šต๋‹ˆ๋‹ค.

use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};

// A work item. In this case, just sleep for the given time and respond
// with a message on the `respond_on` channel.
#[derive(Debug)]
struct Work {
    input: u32,
    respond_on: oneshot::Sender<u32>,
}

// A worker which listens for work on a queue and performs it.
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
    let mut iterations = 0;
    loop {
        tokio::select! {
            Some(work) = work_queue.recv() => {
                sleep(Duration::from_millis(10)).await; // Pretend to work.
                work.respond_on
                    .send(work.input * 1000)
                    .expect("failed to send response");
                iterations += 1;
            }
            // TODO: report number of iterations every 100ms
        }
    }
}

// A requester which requests work and waits for it to complete.
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
    let (tx, rx) = oneshot::channel();
    work_queue
        .send(Work {
            input,
            respond_on: tx,
        })
        .await
        .expect("failed to send on work queue");
    rx.await.expect("failed waiting for response")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    spawn(worker(rx));
    for i in 0..100 {
        let resp = do_work(&tx, i).await;
        println!("work result for iteration {i}: {resp}");
    }
}
  • ์œ„์—์„œ ์†Œ๊ฐœํ•œ ๊ฒƒ์€ ์•กํ„ฐ(actor) ํŒจํ„ด์˜ ํ•œ ์˜ˆ๋ผ๊ณ  ๋ด๋„ ๋ฌด๋ฐฉํ•ฉ๋‹ˆ๋‹ค. ์•กํ„ฐ๋Š” ์ผ๋ฐ˜์ ์œผ๋กœ ๋ฃจํ”„ ์•ˆ์—์„œ select!๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

  • ์ด์ „ ๊ฐ•์˜ ๋ช‡ ๊ฐœ์˜ ๋‚ด์šฉ์„ ์š”์•ฝํ•œ ๊ฒƒ์ด๊ธฐ ๋•Œ๋ฌธ์— ์ฒœ์ฒœํžˆ ์‚ดํŽด๋ณด์„ธ์š”.

    • _ = sleep(Duration::from_millis(100)) => { println!(..) }์„ select!์— ์ถ”๊ฐ€ํ•ด ๋ณด์„ธ์š”. ์ด ์ž‘์—…์€ ์‹คํ–‰๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์™œ ๊ทธ๋Ÿด๊นŒ์š”?

    • ๋Œ€์‹ , ํ•ด๋‹น future๊ฐ€ ํฌํ•จ๋œ timeout_fut๋ฅผ loop ์™ธ๋ถ€์— ์ถ”๊ฐ€ํ•ด ๋ณด์„ธ์š”.

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = sleep(Duration::from_millis(100));
      loop {
          select! {
              ..,
              _ = timeout_fut => { println!(..); },
          }
      }
      }
    • ์—ฌ์ „ํžˆ ์ž‘๋™ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์ปดํŒŒ์ผ๋Ÿฌ ์˜ค๋ฅ˜๋ฅผ ๋”ฐ๋ผ select!์˜ timeout_fut์— &mut๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ Move ์‹œ๋ฉ˜ํ‹ฑ ๊ด€๋ จํ•œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ณ  Box::pin์„ ์‚ฌ์šฉํ•˜์„ธ์š”.

      #![allow(unused)]
      fn main() {
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              ..,
              _ = &mut timeout_fut => { println!(..); },
          }
      }
      }
    • ์ด๋Š” ์ปดํŒŒ์ผ์€ ๋˜์ง€๋งŒ ํƒ€์ž„ ์•„์›ƒ์ด ๋˜๋ฉด ๋งค๋ฒˆ ๋ฐ˜๋ณตํ•  ๋•Œ ๋งˆ๋‹ค Poll::Ready๊ฐ€ ๋ฉ๋‹ˆ๋‹ค(์œตํ•ฉ๋œ future๊ฐ€ ๋„์›€์ด ๋  ์ˆ˜ ์žˆ์Œ). ํƒ€์ž„ ์•„์›ƒ ๋  ๋•Œ๋งˆ๋‹ค timeout_fut๋ฅผ ๋ฆฌ์…‹ํ•˜๋„๋ก ์ˆ˜์ •ํ•˜์„ธ์š”.

  • Box๋Š” ํž™์— ํ• ๋‹นํ•ฉ๋‹ˆ๋‹ค. ๊ฒฝ์šฐ์— ๋”ฐ๋ผ std::pin::pin!(์ตœ๊ทผ์—์•ผ ์•ˆ์ •ํ™”๋˜์—ˆ์œผ๋ฉฐ ์ด์ „ ์ฝ”๋“œ๋Š” tokio::pin!์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋งŽ์Œ)๋„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์ง€๋งŒ ์ด๋Š” ์žฌํ• ๋‹น๋œ future์— ์‚ฌ์šฉํ•˜๊ธฐ๊ฐ€ ์–ด๋ ต์Šต๋‹ˆ๋‹ค.

  • ๋˜ ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์€ pin์„ ์•„์˜ˆ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  100ms๋งˆ๋‹ค oneshot ์ฑ„๋„์— ์ „์†กํ•  ๋‹ค๋ฅธ ์ž‘์—…์„ ์ƒ์„ฑํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.