Tag Archives: apache

Apache Thrift를 Android 통신에 활용하기

thrift_logo

Apache Thrift는 다양한 플랫폼간에 매우 편리하게 사용할 수 있는 통합 RPC 환경을 제공합니다. 제가 실무에서 겪을 수 있는 대부분의 Thrift활용 예는 서버들간의 통신들에 국한되어 있었는데요. 검색을 해봐도 Thrift는 서버에만 사용해야 한다는 말은 없더군요. 단지 이것도 하나의 프로토콜에 불과한것이 아닐까 생각됩니다.

그래서 Thrift로 Server ⟷ Android간 통신에 적용해 보기로 하였습니다. 지금까지는 이러한 통신에 HTTP 통신을 이용한 Json을 주고받는 형태로 많이 구현해 봤었는데요, 통신을 위해 서로 프로토콜을 맞추고 인코딩에 신경쓰고 오류메시지에 신경쓰고 하는 부분이 싹 사라졌습니다.

namespace java kr.pe.theeye.thrift.android

service ArithmeticService {
    i32 add(1:i32 num1, 2:i32 num2),
    i32 multiply(1:i32 num1, 2:i32 num2)
}

위와 같은 thrift 파일을 생성하였습니다. 그리고 다음과 같이 Java 코드를 생성할 수 있습니다.

$ thrift --gen java example.thrift

gen-java 디렉토리 밑에 ArithmeticService.java 파일이 생성되었을 것입니다. 이것을 서버와 클라이언트 프로젝트에 동시에 사용하겠습니다. 먼저 서버 프로젝트부터 만들어 보겠습니다. Gradle 프로젝트를 생성합니다.

apply plugin: 'java'

sourceCompatibility = 1.7
version = '1.0'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.apache.thrift:libthrift:+'
    compile 'org.slf4j:slf4j-api:+'
    compile 'org.slf4j:slf4j-jdk14:+'
}

생성된 ArithmeticService 클래스를 사용하여 적절한 핸들러 클래스를 만들어 보겠습니다.

public class ArithmeticServiceImpl implements ArithmeticService.Iface {

    @Override
    public int add(int num1, int num2) throws TException {
        return num1 + num2;
    }

    @Override
    public int multiply(int num1, int num2) throws TException {
        return num1 * num2;
    }
}

이번에는 서버 클래스를 작성하여 보겠습니다.

public class ThriftThreadPoolServer {

    private void start() {
        try {
            ArithmeticService.Processor processor = new ArithmeticService.Processor(new ArithmeticServiceImpl());

            TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(new TServerSocket(7911));
            serverArgs.protocolFactory(new TCompactProtocol.Factory());
            serverArgs.transportFactory(new TFastFramedTransport.Factory());
            serverArgs.minWorkerThreads(20);
            serverArgs.maxWorkerThreads(1500);
            serverArgs.processorFactory(new TProcessorFactory(processor));

            TServer server = new TThreadPoolServer(serverArgs);
            System.out.println("Starting server on port 7911 ...");
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ThriftThreadPoolServer srv = new ThriftThreadPoolServer();
        srv.start();
    }
}

ThreadPool 서버로 세팅을 하였습니다. 프로토콜은 좀 더 압축률이 높은 CompactProtocol을 사용하였고 FastFramedTransport를 활용하여 통신하려고 합니다. 쓰레드는 최소 20개에서 최대 1500개를 생성하도록 하였습니다. 간단하게 생각해서 동접 1500개 제한이라고 생각하시면 될듯 합니다. 마지막으로 먼저 만들어 두었던 ArithmeticServiceImpl을 RPC 콜이 왔을때 대응할 프로세서(핸들러)로 설정하였습니다.

이번엔 안드로이드 클라이언트를 만들어 보겠습니다. 기본적으로 Fragment를 사용하지 않은 단일 Activity구조로 프로젝트를 만들었습니다. 먼저 Gradle 설정부터 하겠습니다.

apply plugin: 'android'

android {
    compileSdkVersion 19
    buildToolsVersion "19.0.1"

    defaultConfig {
        minSdkVersion 7
        targetSdkVersion 19
        versionCode 1
        versionName "1.0"
    }
    buildTypes {
        release {
            runProguard false
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.txt'
        }
    }
    packagingOptions {
        exclude 'META-INF/LICENSE.txt'
        exclude 'META-INF/NOTICE.txt'
    }
}

dependencies {
    compile 'com.android.support:appcompat-v7:+'
    compile 'org.apache.thrift:libthrift:+'
    compile 'org.slf4j:slf4j-api:+'
    compile 'org.slf4j:slf4j-jdk14:+'
    compile fileTree(dir: 'libs', include: ['*.jar', '*.aar'])
}

통신에는 다양한 방법을 사용할 수 있지만 모바일 환경이기에 커넥션을 맺고 끊는것에 신경을 쓰기 어려우므로 비동기 방식으로 구현을 해보겠습니다. 미리 만들어둔 ArithmeticService 클래스가 프로젝트에 포함되어있다고 가정하고 진행하겠습니다.

구현해둔 RPC 메소드는 덧셈, 곱셈 총 두가지 입니다. 이 두가지에 대한 콜백 클래스를 제작합니다.

class AddMethodCallBack implements AsyncMethodCallback<ArithmeticService.AsyncClient.add_call> {

    @Override
    public void onComplete(ArithmeticService.AsyncClient.add_call add_call) {
        try {
            int result = add_call.getResult();
            Log.e(TAG, "AddMethodCallBack onComplete: " + result);
        } catch (TException e) {
            Log.e(TAG, "AddMethodCallBack TException: " + e.getLocalizedMessage());
        }
    }

    @Override
    public void onError(Exception e) {
        Log.e(TAG, "AddMethodCallBack onError: " + e.getLocalizedMessage());
    }
}

class MultiplyMethodCallBack implements AsyncMethodCallback<ArithmeticService.AsyncClient.multiply_call> {

    @Override
    public void onComplete(ArithmeticService.AsyncClient.multiply_call multiply_call) {
        try {
            int result = multiply_call.getResult();
            Log.e(TAG, "MultiplyMethodCallBack onComplete: " + result);
        } catch (TException e) {
            Log.e(TAG, "MultiplyMethodCallBack TException: " + e.getLocalizedMessage());
        }
    }

    @Override
    public void onError(Exception e) {
        Log.e(TAG, "MultiplyMethodCallBack onError: " + e.getLocalizedMessage());
    }
}

이제 비동기 통신을 담당하는 통신 클라이언트 객체를 생성하는 코드를 만들어 보겠습니다.

public class MainActivity extends ActionBarActivity {

    private static final String TAG = "MainActivity";

    private static final String SERVER_HOST_NAME = "192.168.0.10";
    private static final int SERVER_HOST_PORT = 7911;

    private TCompactProtocol.Factory mProtocolFactory;
    private TAsyncClientManager mAsyncClientManager;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        try {
            mProtocolFactory = new TCompactProtocol.Factory();
            mAsyncClientManager = new TAsyncClientManager();
        } catch (IOException e) {
            Log.e(TAG, "Thrift Client Initialization Failed.");
            Log.e(TAG, e.getLocalizedMessage());
        }
    }

    private ArithmeticService.AsyncClient getAsyncClient() throws IOException {
        return new ArithmeticService.AsyncClient(
                mProtocolFactory,
                mAsyncClientManager,
                new TNonblockingSocket(SERVER_HOST_NAME, SERVER_HOST_PORT));
    }
}

클라이언트가 비동기로 통신하기 위해서는 반드시 NonBlockingSocket을 사용해야만 합니다. 그런데 한가지 문제가 이 NonBlockingSocket은 실제로 동시에 두개의 처리를 할수가 없습니다. 자동으로 채널 셀렉팅을 해주지 않을까 생각했는데 실제로 그렇게 동작하지 않더군요. 그래서 매 요청시마다 커넥션은 새로 만들어 주어야 합니다. 이러한 과정을 getAsyncClient() 메소드에서 처리하도록 하였습니다.

이제 서버와 통신을 해보겠습니다. RPC의 특성상 클라이언트의 메소드를 호출하면 서버의 그것이 실행되어 결과값이 콜백으로 반환됩니다.

try {
    getAsyncClient().add(200, 400, new AddMethodCallBack());
    getAsyncClient().multiply(20, 50, new MultiplyMethodCallBack());
} catch (Exception e) {
    Log.e(TAG, e.getLocalizedMessage());
}

제작한 샘플 코드를 첨부하였습니다.

Apache Thrift 간단하게 알아보기

thrift_logo

Apache Thrift는 페이스북에서 개발한 다양한 언어를 지원하는 RPC 프레임워크입니다. 현재는 오픈소스 아파치 프로젝트로 등록되어있습니다. 이 포스트에서는 어떻게 Thrift 서비스를 개발하고 블록킹/논블록킹 또는 비동기 모드로 서버를 구축하는 방법에 대해 간략하게 정리해 보겠습니다.

아파치의 Thrift와 구글의 ProtoBuf를 비교하는 글을 보면 꼭 나오는 말중에 Thrift는 문서를 찾기가 힘들다는 말이 나오던데 정말 지금도 제대로된 레퍼런스를 찾아보기가 힘들더군요;; Cassandra 처음 다룰때가 생각나게 만드는 녀석이었습니다. 진행은 Homebrew가 설치되어있는 맥에서 진행하겠습니다.

Homebrew를 이용하여 Thrift를 설치하는 방법은 사실 할말이 없습니다. 다음의 명령을 통해 설치를 진행하도록 합니다.

$ brew update
$ brew install thrift

설치가 완료 되면 Thrift 스크립트를 작성해 보겠습니다. 이 스크립트를 한번 작성해 놓으면 다양한 언어에서 사용될 수 있는 코드를 자동으로 생성할 수 있습니다.

namespace java tutorial.arithmetic.gen  // define namespace for java code

typedef i64 long
typedef i32 int
service ArithmeticService {  // defines simple arithmetic service
long add(1:int num1, 2:int num2),
long multiply(1:int num1, 2:int num2),
}

가장 먼저 네임스페이스를 지정하는것을 볼 수 있습니다. 실제로 자바의 경우 동일한 package 구조로 생성이 되며 다른 네임스페이스를 지원하는 언어도 마찬가지로 될것입니다. 프로그래밍 언어별로 다른 네임스페이스 설정을 할 수 있으며 지원하는 언어는 현재 As3, C Glib, C++, CSharp, D, Delphi, Erlang, Go, Graphviz, Haskell, Java, Java Me, Javascript, Node.js, Objective-c, OCaml, Perl, PHP, Python, Ruby, Smalltalk 를 지원합니다. 엄청나게 많군요.

Thrift에서는 32비트/64비트 두가지 Integer 형을 사용할 수 있습니다. 위와 같이 typedef를 통해 재정의하는것도 가능합니다. 64비트 int는 long으로 지정하겠습니다.

Type Description
 bool  Boolean, one byte
 byte  Signed byte
 i16  Signed 16-bit integer
 i32  Signed 32-bit integer
 i64  Signed 64-bit integer
 double  64-bit floating point value
 string  String
 binary  Blob (byte array)
 map<t1,t2>  Map from one type to another
 list<t1>  Ordered list of one type
 set<t1>  Set of unique elements of one type

Thrift는 ProtoBuf에는 없는 Map, List, Set등을 지원합니다. 이후에 보여지는 Service가 서버-서버 또는 서버-클라이언트 통신을 가능케 하는 구현부를 담당합니다. 간단하게 생각해서 클래스와 메소드라고 생각하면 될것 같습니다.

이제 만들어 놓은 Thrift 스크립트를 이용하여 Java 코드를 생성해 보겠습니다.

$ thrift --gen java arithmetic.thrift

위의 명령을 수행하고 오류 없이 정상적으로 수행이 되었다면 gen-java디렉토리 안에 네임스페이스 구조에 맞추어(여기서는 tutorial.arithmetic.gen.ArithmeticService.java) 코드가 생성됩니다. 이렇게 생성된 코드를 이용하여 예제 코드를 작성하여 보겠습니다.

Blocking Mode

우선 블록킹 모드의 서버와 그에 상응하는 클라이언트 개발을 해보겠습니다. Thrift가 생성한 인터페이스(Skeleton)를 구현한 클래스의 작성이 필요합니다. 여기서 사용할 인터페이스의 이름은 AtithmeticService.Iface 입니다.

public class ArithmeticServiceImpl implements ArithmeticService.Iface {

    public long add(int num1, int num2) throws TException {
        return num1 + num2;
    }

    public long multiply(int num1, int num2) throws TException {
        return num1 * num2;
    }

}

아까 스크립트에서 정의했던 두개의 메소드에 대해 어떤 동작을 할것인지 구현을 완료하였습니다. 이번에는 이 서비스의 요청을 처리할 서버를 만들어 보겠습니다. TSimpleServer라는 간단한 구현체가 있지만 TThreadPoolServer를 이용해 보겠습니다. 이 서버는 블록킹 서버이며 각각의 요청을 처리하기 위해 개별 쓰레드가 작업을 처리하는동안 기다리게 됩니다.

public class Server {

    private void start() {
        try {
            TServerSocket serverTransport = new TServerSocket(7911);

            ArithmeticService.Processor processor = new ArithmeticService.Processor(new ArithmeticServiceImpl());

            TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
                    processor(processor));
            System.out.println("Starting server on port 7911 ...");
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Server srv = new Server();
        srv.start();
    }

}

쓰레드풀을 이용하는 TThreadPoolServer의 구현이 완료되었습니다. 이번에는 클라이언트를 제작해 보겠습니다.

public class ArithmeticClient {

    private void invoke() {
        TTransport transport;
        try {
            transport = new TSocket("localhost", 7911);

            TProtocol protocol = new TBinaryProtocol(transport);

            ArithmeticService.Client client = new ArithmeticService.Client(protocol);
            transport.open();

            long addResult = client.add(100, 200);
            System.out.println("Add result: " + addResult);
            long multiplyResult = client.multiply(20, 40);
            System.out.println("Multiply result: " + multiplyResult);

            transport.close();
        } catch (TTransportException e) {
            e.printStackTrace();
        } catch (TException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ArithmeticClient c = new ArithmeticClient();
        c.invoke();

    }
}

여기서도 생각이상으로 할일이 없습니다. 잘만들어져있는(?) ArithmeticService.Client를 이용하여 서버와 통신하면 됩니다. add와 multiply 메소드를 실행해 보았습니다. TBinaryProtocol을 이용할 경우 객체를 그대로 시리얼라이즈 하여 서버와 통신하게 됩니다. 사용할 수 있는 프로토콜은 다음과 같습니다.

Protocol Description
TBinaryProtocol 단순하게  바이너리를 그대로 전송합니다. 공간 효율은 떨어지지만 다른 텍스트 프로토콜에 비해 처리가 빠릅니다. 하지만 디버그가 어렵습니다.
TCompactProtocol TBinaryProtocol의 공간효율을 좀 더 높인 프로토콜입니다. 일반적으로 좀 더 효율적으로 처리를 합니다.
TDebugProtocol 디버깅에 용이한 사람이 읽을 수 있는 텍스트형태로 전송합니다.
TDenseProtocol TCompactProtocol과 비슷하지만 전송과 관련된 메타 데이터를 제거한 프로토콜입니다.
TJSONProtocol 데이터를 JSON형태로 인코딩하여 전송합니다.

Non Blocking Mode

이번에는 논블록킹 모드로 동작하는 서버를 만들어보겠습니다. 서비스 구현부는 이전에 만들었던 ArithmeticServiceImpl를 재활용 하겠습니다.

public class NonblockingServer {

    private void start() {
        try {
            TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(7911);
            ArithmeticService.Processor processor = new ArithmeticService.Processor(new ArithmeticServiceImpl());

            TServer server = new TNonblockingServer(new TNonblockingServer.Args(serverTransport).
                    processor(processor));
            System.out.println("Starting server on port 7911 ...");
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        NonblockingServer srv = new NonblockingServer();
        srv.start();
    }
}

TNonblockingServer를 사용하게 되면 TSimpleServer에서 하나의 요청을 처리하는동안 다른 요청이 모두 블록되는 문제를 해결할 수 있습니다. ServerSocketChannel을 이용하여 먼저 들어온 요청이 처리되는 중에도 다른 요청을 받을 수 있게 고안되었습니다. 이번에는 마찬가지로 논블록킹 모드로 동작할 수 있는 클라이언트를 만들겠습니다.

public class NonblockingClient {

    private void invoke() {
        TTransport transport;
        try {
            transport = new TFramedTransport(new TSocket("localhost", 7911));
            TProtocol protocol = new TBinaryProtocol(transport);

            ArithmeticService.Client client = new ArithmeticService.Client(protocol);
            transport.open();

            long addResult = client.add(100, 200);
            System.out.println("Add result: " + addResult);
            long multiplyResult = client.multiply(20, 40);
            System.out.println("Multiply result: " + multiplyResult);

            transport.close();
        } catch (TTransportException e) {
            e.printStackTrace();
        } catch (TException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        NonblockingClient c = new NonblockingClient();
        c.invoke();
    }

}

클라이언트는 평범한 TSocket을 TFramedTransport에 래핑한 형태로 사용하게 됩니다. 논블록킹 서버는 클라이언트가 TFramedTransport를 사용해야 합니다.

Asynchronous Mode

이번에는 비동기로 동작하는 클라이언트를 만들어보겠습니다. 요청이 성공하였을 경우 수행될 콜백을 미리 지정해 두는 과정이 필요합니다. 비동기 모드의 클라이언트는 블록킹 모드의 서버와 통신이 불가능합니다. (실행이 되어도 빈값이 반환됩니다) 클라이언트를 비동기로 동작시키기 위해서는 TNonblockingSocket을 사용해야 합니다. 또한 Thrift가 생성한 ArithmeticService.AsyncClient를 사용해야 합니다.

public class AsyncClient {

    private void invoke() {
        try {
            ArithmeticService.AsyncClient client = new ArithmeticService.
                    AsyncClient(new TBinaryProtocol.Factory(), new TAsyncClientManager(),
                                new TNonblockingSocket("localhost", 7911));

            client.add(200, 400, new AddMethodCallback());

            client = new ArithmeticService.
                    AsyncClient(new TBinaryProtocol.Factory(), new TAsyncClientManager(),
                                new TNonblockingSocket("localhost", 7911));
            client.multiply(20, 50, new MultiplyMethodCallback());

        } catch (TTransportException e) {
            e.printStackTrace();
        } catch (TException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        AsyncClient c = new AsyncClient();
        c.invoke();

    }

    class AddMethodCallback
            implements AsyncMethodCallback<ArithmeticService.AsyncClient.add_call> {

        public void onComplete(ArithmeticService.AsyncClient.add_call add_call) {
            try {
                long result = add_call.getResult();
                System.out.println("Add from server: " + result);
            } catch (TException e) {
                e.printStackTrace();
            }
        }

        public void onError(Exception e) {
            System.out.println("Error : ");
            e.printStackTrace();
        }

    }

    class MultiplyMethodCallback
            implements AsyncMethodCallback<ArithmeticService.AsyncClient.multiply_call> {

        public void onComplete(ArithmeticService.AsyncClient.multiply_call multiply_call) {
            try {
                long result = multiply_call.getResult();
                System.out.println("Multiply from server: " + result);
            } catch (TException e) {
                e.printStackTrace();
            }
        }

        public void onError(Exception e) {
            System.out.println("Error : ");
            e.printStackTrace();
        }

    }

}

위의 코드에서 특이한 점은 두개의 비동기 수행을 위해 두개의 클라이언트(client 인스턴스)를 만들었다는 점입니다. 비동기 모드에서는 하나의 클라이언트는 동시에 한개이상의 작업을 수행할 수 없습니다. 두번째 new ArithmeticService.AsyncClient(…) 를 수행하지 않고 곧바로 client.multiply(…)를 수행하였다면 “Client is currently executing another method” 예외가 발생하게 됩니다.

참고