Tag Archives: TNonblockingServer

Apache Thrift 간단하게 알아보기

thrift_logo

Apache Thrift는 페이스북에서 개발한 다양한 언어를 지원하는 RPC 프레임워크입니다. 현재는 오픈소스 아파치 프로젝트로 등록되어있습니다. 이 포스트에서는 어떻게 Thrift 서비스를 개발하고 블록킹/논블록킹 또는 비동기 모드로 서버를 구축하는 방법에 대해 간략하게 정리해 보겠습니다.

아파치의 Thrift와 구글의 ProtoBuf를 비교하는 글을 보면 꼭 나오는 말중에 Thrift는 문서를 찾기가 힘들다는 말이 나오던데 정말 지금도 제대로된 레퍼런스를 찾아보기가 힘들더군요;; Cassandra 처음 다룰때가 생각나게 만드는 녀석이었습니다. 진행은 Homebrew가 설치되어있는 맥에서 진행하겠습니다.

Homebrew를 이용하여 Thrift를 설치하는 방법은 사실 할말이 없습니다. 다음의 명령을 통해 설치를 진행하도록 합니다.

$ brew update
$ brew install thrift

설치가 완료 되면 Thrift 스크립트를 작성해 보겠습니다. 이 스크립트를 한번 작성해 놓으면 다양한 언어에서 사용될 수 있는 코드를 자동으로 생성할 수 있습니다.

namespace java tutorial.arithmetic.gen  // define namespace for java code

typedef i64 long
typedef i32 int
service ArithmeticService {  // defines simple arithmetic service
long add(1:int num1, 2:int num2),
long multiply(1:int num1, 2:int num2),
}

가장 먼저 네임스페이스를 지정하는것을 볼 수 있습니다. 실제로 자바의 경우 동일한 package 구조로 생성이 되며 다른 네임스페이스를 지원하는 언어도 마찬가지로 될것입니다. 프로그래밍 언어별로 다른 네임스페이스 설정을 할 수 있으며 지원하는 언어는 현재 As3, C Glib, C++, CSharp, D, Delphi, Erlang, Go, Graphviz, Haskell, Java, Java Me, Javascript, Node.js, Objective-c, OCaml, Perl, PHP, Python, Ruby, Smalltalk 를 지원합니다. 엄청나게 많군요.

Thrift에서는 32비트/64비트 두가지 Integer 형을 사용할 수 있습니다. 위와 같이 typedef를 통해 재정의하는것도 가능합니다. 64비트 int는 long으로 지정하겠습니다.

Type Description
 bool  Boolean, one byte
 byte  Signed byte
 i16  Signed 16-bit integer
 i32  Signed 32-bit integer
 i64  Signed 64-bit integer
 double  64-bit floating point value
 string  String
 binary  Blob (byte array)
 map<t1,t2>  Map from one type to another
 list<t1>  Ordered list of one type
 set<t1>  Set of unique elements of one type

Thrift는 ProtoBuf에는 없는 Map, List, Set등을 지원합니다. 이후에 보여지는 Service가 서버-서버 또는 서버-클라이언트 통신을 가능케 하는 구현부를 담당합니다. 간단하게 생각해서 클래스와 메소드라고 생각하면 될것 같습니다.

이제 만들어 놓은 Thrift 스크립트를 이용하여 Java 코드를 생성해 보겠습니다.

$ thrift --gen java arithmetic.thrift

위의 명령을 수행하고 오류 없이 정상적으로 수행이 되었다면 gen-java디렉토리 안에 네임스페이스 구조에 맞추어(여기서는 tutorial.arithmetic.gen.ArithmeticService.java) 코드가 생성됩니다. 이렇게 생성된 코드를 이용하여 예제 코드를 작성하여 보겠습니다.

Blocking Mode

우선 블록킹 모드의 서버와 그에 상응하는 클라이언트 개발을 해보겠습니다. Thrift가 생성한 인터페이스(Skeleton)를 구현한 클래스의 작성이 필요합니다. 여기서 사용할 인터페이스의 이름은 AtithmeticService.Iface 입니다.

public class ArithmeticServiceImpl implements ArithmeticService.Iface {

    public long add(int num1, int num2) throws TException {
        return num1 + num2;
    }

    public long multiply(int num1, int num2) throws TException {
        return num1 * num2;
    }

}

아까 스크립트에서 정의했던 두개의 메소드에 대해 어떤 동작을 할것인지 구현을 완료하였습니다. 이번에는 이 서비스의 요청을 처리할 서버를 만들어 보겠습니다. TSimpleServer라는 간단한 구현체가 있지만 TThreadPoolServer를 이용해 보겠습니다. 이 서버는 블록킹 서버이며 각각의 요청을 처리하기 위해 개별 쓰레드가 작업을 처리하는동안 기다리게 됩니다.

public class Server {

    private void start() {
        try {
            TServerSocket serverTransport = new TServerSocket(7911);

            ArithmeticService.Processor processor = new ArithmeticService.Processor(new ArithmeticServiceImpl());

            TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
                    processor(processor));
            System.out.println("Starting server on port 7911 ...");
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Server srv = new Server();
        srv.start();
    }

}

쓰레드풀을 이용하는 TThreadPoolServer의 구현이 완료되었습니다. 이번에는 클라이언트를 제작해 보겠습니다.

public class ArithmeticClient {

    private void invoke() {
        TTransport transport;
        try {
            transport = new TSocket("localhost", 7911);

            TProtocol protocol = new TBinaryProtocol(transport);

            ArithmeticService.Client client = new ArithmeticService.Client(protocol);
            transport.open();

            long addResult = client.add(100, 200);
            System.out.println("Add result: " + addResult);
            long multiplyResult = client.multiply(20, 40);
            System.out.println("Multiply result: " + multiplyResult);

            transport.close();
        } catch (TTransportException e) {
            e.printStackTrace();
        } catch (TException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ArithmeticClient c = new ArithmeticClient();
        c.invoke();

    }
}

여기서도 생각이상으로 할일이 없습니다. 잘만들어져있는(?) ArithmeticService.Client를 이용하여 서버와 통신하면 됩니다. add와 multiply 메소드를 실행해 보았습니다. TBinaryProtocol을 이용할 경우 객체를 그대로 시리얼라이즈 하여 서버와 통신하게 됩니다. 사용할 수 있는 프로토콜은 다음과 같습니다.

Protocol Description
TBinaryProtocol 단순하게  바이너리를 그대로 전송합니다. 공간 효율은 떨어지지만 다른 텍스트 프로토콜에 비해 처리가 빠릅니다. 하지만 디버그가 어렵습니다.
TCompactProtocol TBinaryProtocol의 공간효율을 좀 더 높인 프로토콜입니다. 일반적으로 좀 더 효율적으로 처리를 합니다.
TDebugProtocol 디버깅에 용이한 사람이 읽을 수 있는 텍스트형태로 전송합니다.
TDenseProtocol TCompactProtocol과 비슷하지만 전송과 관련된 메타 데이터를 제거한 프로토콜입니다.
TJSONProtocol 데이터를 JSON형태로 인코딩하여 전송합니다.

Non Blocking Mode

이번에는 논블록킹 모드로 동작하는 서버를 만들어보겠습니다. 서비스 구현부는 이전에 만들었던 ArithmeticServiceImpl를 재활용 하겠습니다.

public class NonblockingServer {

    private void start() {
        try {
            TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(7911);
            ArithmeticService.Processor processor = new ArithmeticService.Processor(new ArithmeticServiceImpl());

            TServer server = new TNonblockingServer(new TNonblockingServer.Args(serverTransport).
                    processor(processor));
            System.out.println("Starting server on port 7911 ...");
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        NonblockingServer srv = new NonblockingServer();
        srv.start();
    }
}

TNonblockingServer를 사용하게 되면 TSimpleServer에서 하나의 요청을 처리하는동안 다른 요청이 모두 블록되는 문제를 해결할 수 있습니다. ServerSocketChannel을 이용하여 먼저 들어온 요청이 처리되는 중에도 다른 요청을 받을 수 있게 고안되었습니다. 이번에는 마찬가지로 논블록킹 모드로 동작할 수 있는 클라이언트를 만들겠습니다.

public class NonblockingClient {

    private void invoke() {
        TTransport transport;
        try {
            transport = new TFramedTransport(new TSocket("localhost", 7911));
            TProtocol protocol = new TBinaryProtocol(transport);

            ArithmeticService.Client client = new ArithmeticService.Client(protocol);
            transport.open();

            long addResult = client.add(100, 200);
            System.out.println("Add result: " + addResult);
            long multiplyResult = client.multiply(20, 40);
            System.out.println("Multiply result: " + multiplyResult);

            transport.close();
        } catch (TTransportException e) {
            e.printStackTrace();
        } catch (TException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        NonblockingClient c = new NonblockingClient();
        c.invoke();
    }

}

클라이언트는 평범한 TSocket을 TFramedTransport에 래핑한 형태로 사용하게 됩니다. 논블록킹 서버는 클라이언트가 TFramedTransport를 사용해야 합니다.

Asynchronous Mode

이번에는 비동기로 동작하는 클라이언트를 만들어보겠습니다. 요청이 성공하였을 경우 수행될 콜백을 미리 지정해 두는 과정이 필요합니다. 비동기 모드의 클라이언트는 블록킹 모드의 서버와 통신이 불가능합니다. (실행이 되어도 빈값이 반환됩니다) 클라이언트를 비동기로 동작시키기 위해서는 TNonblockingSocket을 사용해야 합니다. 또한 Thrift가 생성한 ArithmeticService.AsyncClient를 사용해야 합니다.

public class AsyncClient {

    private void invoke() {
        try {
            ArithmeticService.AsyncClient client = new ArithmeticService.
                    AsyncClient(new TBinaryProtocol.Factory(), new TAsyncClientManager(),
                                new TNonblockingSocket("localhost", 7911));

            client.add(200, 400, new AddMethodCallback());

            client = new ArithmeticService.
                    AsyncClient(new TBinaryProtocol.Factory(), new TAsyncClientManager(),
                                new TNonblockingSocket("localhost", 7911));
            client.multiply(20, 50, new MultiplyMethodCallback());

        } catch (TTransportException e) {
            e.printStackTrace();
        } catch (TException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        AsyncClient c = new AsyncClient();
        c.invoke();

    }

    class AddMethodCallback
            implements AsyncMethodCallback<ArithmeticService.AsyncClient.add_call> {

        public void onComplete(ArithmeticService.AsyncClient.add_call add_call) {
            try {
                long result = add_call.getResult();
                System.out.println("Add from server: " + result);
            } catch (TException e) {
                e.printStackTrace();
            }
        }

        public void onError(Exception e) {
            System.out.println("Error : ");
            e.printStackTrace();
        }

    }

    class MultiplyMethodCallback
            implements AsyncMethodCallback<ArithmeticService.AsyncClient.multiply_call> {

        public void onComplete(ArithmeticService.AsyncClient.multiply_call multiply_call) {
            try {
                long result = multiply_call.getResult();
                System.out.println("Multiply from server: " + result);
            } catch (TException e) {
                e.printStackTrace();
            }
        }

        public void onError(Exception e) {
            System.out.println("Error : ");
            e.printStackTrace();
        }

    }

}

위의 코드에서 특이한 점은 두개의 비동기 수행을 위해 두개의 클라이언트(client 인스턴스)를 만들었다는 점입니다. 비동기 모드에서는 하나의 클라이언트는 동시에 한개이상의 작업을 수행할 수 없습니다. 두번째 new ArithmeticService.AsyncClient(…) 를 수행하지 않고 곧바로 client.multiply(…)를 수행하였다면 “Client is currently executing another method” 예외가 발생하게 됩니다.

참고