rust로 작성할 수 있는 서버와 관련된 라이브러리 중 axum을 알아보고자 한다.

 

axum의 깃허브에 있는 예제 중 한가지를 통해서 어떤 식으로 코드가 작성되고 있는지 확인해보자.

//https://github.com/tokio-rs/axum/blob/main/examples/hello-world/src/main.rs

use axum::{response::Html, routing::get, Router};

#[tokio::main]
async fn main() {
    // build our application with a route
    let app = Router::new().route("/", get(handler));

    // run it
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
        .await
        .unwrap();
    println!("listening on {}", listener.local_addr().unwrap());
    axum::serve(listener, app).await.unwrap();
}

async fn handler() -> Html<&'static str> {
    Html("<h1>Hello, World!</h1>")
}

 

한줄 한줄 씹어 먹어 보도록 하자.

 

#[tokio::main]

내가 공부했던 자바스크립트에서는 node.js 혹은 브라우저가 비동기 환경을 만드는 런타임을 제공해준다.

 

하지만 러스트는 언어 자체에서 async/await 구문은 지원하지만 런타임은 내장되어 있지 않다.

 

따라서 비동기 작업의 실행을 위해서 외부 라이브러리를 사용하는데, 이 비동기 런타임을 생성하는 라이브러리 중에서 널리 사용되는 tokio를 사용했다.

 

tokio의 작동원리는 다음에 알아보도록 하자.

 

async fn main()

 

비동기 코드를 실행하기 위해서 메인 함수에 async를 붙였고, 위에서 설명한 것과 같이 #[tokio::main]을 선언해서 tokio의 런타임이 생성되면서 main 함수가 비동기로 실행이 된다.

 

 

let app = Router::new().route("/", get(handler));


pub struct Router<S = ()> {
    inner: Arc<RouterInner<S>>,
}


struct RouterInner<S> {
    path_router: PathRouter<S, false>,
    fallback_router: PathRouter<S, true>,
    default_fallback: bool,
    catch_all_fallback: Fallback<S>,
}


pub fn route(self, path: &str, method_router: MethodRouter<S>) -> Self {
    tap_inner!(self, mut this => {
        panic_on_err!(this.path_router.route(path, method_router));
    })
}

Router라는 구조체를 만들어서 route 메서드를 통해서 각각의 위치에 대해서 특정한 요청을 만들어 주는 코드이다.

 

Router를 확인해보면 내부의 Arc로 감싸서 멀티스레드 환경에서도 안전하게 공유가 되도록 만들어 놓았다.

 

route메서드에서 느낌표가 붙여진 코드들(tap_inner!,panic_on_err!)는 당장에 필자도 완벽하게 설명하기 어렵지만, panic_on_err! 매크로 내부 정도만 확인해보자.

 

현재의 this는 RouterInner를 가르키고 있는데, 이 RouterInner의 path_router 요소에 대해서 다시 route 메서드를 호출하도록 작성되어 있다.

 

route 메서드의 파라미터에 전달되는 path는  "/"에 해당하고, method_router는 get(handler)에 해당하게 된다.

 

pub(super) struct PathRouter<S, const IS_FALLBACK: bool> {
    routes: HashMap<RouteId, Endpoint<S>>,
    node: Arc<Node>,
    prev_route_id: RouteId,
    v7_checks: bool,
}


pub(super) fn route(
     &mut self,
     path: &str,
     method_router: MethodRouter<S>,
 ) -> Result<(), Cow<'static, str>> {
     validate_path(self.v7_checks, path)?;
     let endpoint = if let Some((route_id, Endpoint::MethodRouter(prev_method_router))) = self
         .node
         .path_to_route_id
         .get(path)
         .and_then(|route_id| self.routes.get(route_id).map(|svc| (*route_id, svc)))
     {
         // if we're adding a new `MethodRouter` to a route that already has one just
         // merge them. This makes `.route("/", get(_)).route("/", post(_))` work
         let service = Endpoint::MethodRouter(
             prev_method_router
                 .clone()
                 .merge_for_path(Some(path), method_router),
         );
         self.routes.insert(route_id, service);
         return Ok(());
     } else {
         Endpoint::MethodRouter(method_router)
     };

     let id = self.next_route_id();
     self.set_node(path, id)?;
     self.routes.insert(id, endpoint);

     Ok(())
}

RouterInner의 path_router 요소는 PathRouter 구조체로 되어있는데, 그것을 가져와 봤다.

 

여기서도 모든 것에 집중하기 보다는 PathRouter 구조체의 routes 요소가 HashMap으로 되어있고, 여기에 RouteId와 EndPoint가 들어간다는 것을 인지하자.

 

PathRouter의 route 메서드 전체를 이해하기는 힘들다.

 

대략적으로 설명하자면, 이전에 존재하던 MethodRouter가 있다면 여기다가 새로운 메서드를 추가한 후 병합을 해주는 과정을 거친면서 한 곳에서 관리할 수 있도록 한다.

 

만약에 존재하지 않는다면 신규로 MethodRouter를 반환한다.

 

이 MethodRouter를 PathRouter의 routes에다가 추가를 해주는 과정을 거치면 위에서 전달했던 path에 해당하는 router가 추가된다.

 

axum::serve(listener, app).await.unwrap();


#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
#[must_use = "futures must be awaited or polled"]
pub struct Serve<L, M, S> {
    listener: L,
    make_service: M,
    _marker: PhantomData<S>,
}


pub fn serve<L, M, S>(listener: L, make_service: M) -> Serve<L, M, S>
where
    L: Listener,
    M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S>,
    S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
    S::Future: Send,
{
    Serve {
        listener,
        make_service,
        _marker: PhantomData,
    }
}

serve 메서드 자체는 Serve라는 구조체를 반환할 뿐이다.

 

그러나 await이 붙여져있기에 다르게 보아야 한다.

 

#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
impl<L, M, S> IntoFuture for Serve<L, M, S>
where
    L: Listener,
    L::Addr: Debug,
    M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
    for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
    S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
    S::Future: Send,
{
    type Output = io::Result<()>;
    type IntoFuture = private::ServeFuture;

    fn into_future(self) -> Self::IntoFuture {
        self.with_graceful_shutdown(std::future::pending())
            .into_future()
    }
}

#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
impl<L, M, S, F> IntoFuture for WithGracefulShutdown<L, M, S, F>
where
    L: Listener,
    L::Addr: Debug,
    M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
    for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
    S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
    S::Future: Send,
    F: Future<Output = ()> + Send + 'static,
{
    type Output = io::Result<()>;
    type IntoFuture = private::ServeFuture;

    fn into_future(self) -> Self::IntoFuture {
        let Self {
            mut listener,
            mut make_service,
            signal,
            _marker: _,
        } = self;

        private::ServeFuture(Box::pin(async move {
            let (signal_tx, signal_rx) = watch::channel(());
            let signal_tx = Arc::new(signal_tx);
            tokio::spawn(async move {
                signal.await;
                trace!("received graceful shutdown signal. Telling tasks to shutdown");
                drop(signal_rx);
            });

            let (close_tx, close_rx) = watch::channel(());

            loop {
                let (io, remote_addr) = tokio::select! {
                    conn = listener.accept() => conn,
                    _ = signal_tx.closed() => {
                        trace!("signal received, not accepting new connections");
                        break;
                    }
                };

                let io = TokioIo::new(io);

                trace!("connection {remote_addr:?} accepted");

                poll_fn(|cx| make_service.poll_ready(cx))
                    .await
                    .unwrap_or_else(|err| match err {});

                let tower_service = make_service
                    .call(IncomingStream {
                        io: &io,
                        remote_addr,
                    })
                    .await
                    .unwrap_or_else(|err| match err {})
                    .map_request(|req: Request<Incoming>| req.map(Body::new));

                let hyper_service = TowerToHyperService::new(tower_service);

                let signal_tx = Arc::clone(&signal_tx);

                let close_rx = close_rx.clone();

                tokio::spawn(async move {
                    #[allow(unused_mut)]
                    let mut builder = Builder::new(TokioExecutor::new());
                    // CONNECT protocol needed for HTTP/2 websockets
                    #[cfg(feature = "http2")]
                    builder.http2().enable_connect_protocol();
                    let conn = builder.serve_connection_with_upgrades(io, hyper_service);
                    pin_mut!(conn);

                    let signal_closed = signal_tx.closed().fuse();
                    pin_mut!(signal_closed);

                    loop {
                        tokio::select! {
                            result = conn.as_mut() => {
                                if let Err(_err) = result {
                                    trace!("failed to serve connection: {_err:#}");
                                }
                                break;
                            }
                            _ = &mut signal_closed => {
                                trace!("signal received in task, starting graceful shutdown");
                                conn.as_mut().graceful_shutdown();
                            }
                        }
                    }

                    drop(close_rx);
                });
            }

            drop(close_rx);
            drop(listener);

            trace!(
                "waiting for {} task(s) to finish",
                close_tx.receiver_count()
            );
            close_tx.closed().await;

            Ok(())
        }))
    }
}

IntoFuture를 구현하게 되면 해당 타입을 Future라는 타입으로 변환할 수 있게 해준다.

 

이 Future에다가 await을 붙이게 되면 값이 반환될때까지 비동기로 기다릴 수 있게 만들어주는 것이다.

 

Serve 타입에 대해서도 IntoFuture가 구현되어 있고, serve 메서드의 내부에서 또 into_future를 호출하고 있다.

 

Serve 구조체의 into_future는 내부적으로 WithGracefulShutdown 타입으로 변환이 되고, 다시 이 타입의 into_future를 호출하게 된다.

 

마지막의 코드는 into_future 메서드에 대해서 설명하고 있는데, 이것도 대략적으로 작동 원리에 대해서만 살펴보고자 한다.

 

내부적으로는 tokio 라이브러리를 사용해서 계속해서 종료 신호가 오는지에 대해서 감시를 하게 된다.

 

아래에서는 tower_service라는 변수와 hyper_service라는 변수를 확인할 수 있다.

 

사실은 위의 axum 코드의 Router는 내부적으로 tower라는 라이브러리의 Service라는 트레이트를 구현을 해 놓았다.

 

이 tower의 Service를 hyper 라이브러리의 Service로 변환하는 과정을 거친 후에 tokio 기반의 서버에서 실행을 하게 된다.

 

 

 

이처럼 axum 자체는 엄청나게 많은 역할을 한다기 보다는 tokio나 tower나 hyper를 이어주는 역할을 해 사용자가 쉽게 서버를 구축할 수 있도록 돕는 역할을 한다고 볼 수 있다.

반응형

+ Recent posts