Песочница →
            Диспетчер произвольных сообщений на базе google protocol buffers
        
                    
            
                
      Появился свободный день, и я решил поиграться с библиотекой google::protobuf. Данная библиотека предоставляет возможность кодирования и декодирования структурированных данных. На базе этой библиотеки я построю простенький диспетчер, который может обрабатывать любые сообщения. Необычность данного диспетчера состоит в том, что он не будет знать типы передаваемых сообщений, и будет обрабатывать сообщения только с помощью зарегистрированных обработчиков.
Итак, сначала вкратце рассмотрим библиотеку google::protobuf, она поставляется в виде двух компонент:
собственно, сама библиотека + заголовочные файлы
компилятор файлов *.proto — генерирует из описания сообщения C++ класс (также есть возможность генерации для других языков программирования: Java, Python и т.д.)
В отдельном файле создается описание сообщения, из которого будет сгенерирован класс, синтаксис очень простой:
Компилятор автоматически генерирует C++ код для сериализации и десериализации подобных сообщений. Библиотека protobuf также предоставляет дополнительные возможности: сериализация в файл, в поток, в буфер.
Я использую CMake в качестве системы сборки, и в нем уже есть поддержка protobuf:
Все делается автоматически, и никаких дополнительных приседаний делать не надо (Под *nix может понадобиться дополнительный пакет Threads и соответствующий флаг линковщику).
Я решил попробовать написать диспетчер сообщений, который принимает какое-то сообщение, вызывает соответствующий обработчик и отправляет ответ на полученное сообщение. При этом диспетчер не должен знать типы передаваемых ему сообщений. Это может быть необходимо в случае, если диспетчер добавляет или удаляет соответствующие обработчики в процессе работы (например, подгрузив соответствующий модель расширения, *.dll, *.so).
Для того чтобы обрабатывать произвольные сообщения, у нас должен быть класс, который обрабатывает абстрактное сообщение. Очевидно, если у нас будут описания сообщений в *.proto файле, то компилятор нам сгенерирует соответствующие классы, но к сожалению все они будут наследованы от google::protobuf::Message. У данного класса проблематично вытащить все данные из сообщения (сделать это в принципе можно, но тогда мы будем делать кучу лишней работы), к тому же мы не будем знать, как нам сформировать ответ.
На помощь приходит высказывание: «Любую проблему можно решить путём введения дополнительного уровня абстракции, кроме проблемы слишком большого количества уровней абстракции».
Нам надо отделить определение типа сообщения от самого сообщения, мы это можем сделать следующим способом:
Итак, реализация очень проста:
Ну и напоследок приведу пример использования данного диспетчера, допустим у нас есть два вида сообщений:
P.S. Для написания данной статьи использовались:
Пример выложен на github
        
        Краткое описание библиотеки protobuf
Итак, сначала вкратце рассмотрим библиотеку google::protobuf, она поставляется в виде двух компонент:
собственно, сама библиотека + заголовочные файлы
компилятор файлов *.proto — генерирует из описания сообщения C++ класс (также есть возможность генерации для других языков программирования: Java, Python и т.д.)
В отдельном файле создается описание сообщения, из которого будет сгенерирован класс, синтаксис очень простой:
package sample.proto;
message ServerStatusAnswer {
    optional int32 threadCount = 1;
    repeated string listeners = 2;
}
- threadCount — необязательный целочисленный параметр
- listeners — необязательная строка, которая может несколько раз повторяться
ServerStatusAnswer {
    threadCount = 3
    listeners = {
        "one",
        "two"
    }
}
Компилятор автоматически генерирует C++ код для сериализации и десериализации подобных сообщений. Библиотека protobuf также предоставляет дополнительные возможности: сериализация в файл, в поток, в буфер.
Я использую CMake в качестве системы сборки, и в нем уже есть поддержка protobuf:
cmake_minimum_required(VERSION 2.8)
project(ProtobufTests)
find_package(Protobuf REQUIRED)
include_directories(${PROTOBUF_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
#...
set (ProtobufTestsProtoSources
    Message.proto
    ServerStatus.proto
    Echo.proto
)
#...
PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${ProtobufTestsProtoSources})
add_executable(ProtobufTests ${ProtobufTestsSources} ${PROTO_SRCS} ${PROTO_HDRS})
target_link_libraries(ProtobufTests
    #...
    ${PROTOBUF_LIBRARY}
)
Все делается автоматически, и никаких дополнительных приседаний делать не надо (Под *nix может понадобиться дополнительный пакет Threads и соответствующий флаг линковщику).
Описание диспетчера
Я решил попробовать написать диспетчер сообщений, который принимает какое-то сообщение, вызывает соответствующий обработчик и отправляет ответ на полученное сообщение. При этом диспетчер не должен знать типы передаваемых ему сообщений. Это может быть необходимо в случае, если диспетчер добавляет или удаляет соответствующие обработчики в процессе работы (например, подгрузив соответствующий модель расширения, *.dll, *.so).
Для того чтобы обрабатывать произвольные сообщения, у нас должен быть класс, который обрабатывает абстрактное сообщение. Очевидно, если у нас будут описания сообщений в *.proto файле, то компилятор нам сгенерирует соответствующие классы, но к сожалению все они будут наследованы от google::protobuf::Message. У данного класса проблематично вытащить все данные из сообщения (сделать это в принципе можно, но тогда мы будем делать кучу лишней работы), к тому же мы не будем знать, как нам сформировать ответ.
На помощь приходит высказывание: «Любую проблему можно решить путём введения дополнительного уровня абстракции, кроме проблемы слишком большого количества уровней абстракции».
Нам надо отделить определение типа сообщения от самого сообщения, мы это можем сделать следующим способом:
package sample.proto;
message Message {
    required string id = 1;
    optional bytes data = 2;
}
- обязательное поле id содержит уникальный идентификатор сообщения
- необязательное поле data содержит наше сообщение
#ifndef MESSAGEDISPATCHER_H
#define MESSAGEDISPATCHER_H
#include <map>
#include <stdexcept>
#include <boost/noncopyable.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include "message.pb.h"
class MessageProcessingError: public std::runtime_error
{
public:
    MessageProcessingError(const std::string & e): std::runtime_error(e)
    {
    }
};
class MessageProcessorBase: private boost::noncopyable
{
public:
    virtual ~MessageProcessorBase()
    {
    }
    virtual std::string id() const = 0;
    virtual sample::proto::Message process(const sample::proto::Message & query) = 0;
};
typedef boost::shared_ptr<MessageProcessorBase> MessageProcessorBasePtr;
class MessageDispatcher
{
public:
    MessageDispatcher();
    void addProcessor(MessageProcessorBasePtr processor);
    sample::proto::Message dispatch(const sample::proto::Message & query);
    typedef std::map<std::string, MessageProcessorBasePtr> DispatcherImplType;
    const DispatcherImplType & impl() const;
private:
    DispatcherImplType mImpl;
};
#endif // MESSAGEDISPATCHER_H
Итак, реализация очень проста:
template <typename ProtoQueryT, typename ProtoAnswerT>
class ProtoMessageProcessor: public MessageProcessorBase
{
public:
    virtual sample::proto::Message process(const sample::proto::Message & query)
    {
        ProtoQueryT underlyingQuery;
        if (!underlyingQuery.ParseFromString(query.data()))
        {
            throw MessageProcessingError("Failed to parse query: " +
                query.ShortDebugString());
        }
        ProtoAnswerT underlyingAnswer = doProcessing(underlyingQuery);
        sample::proto::Message a;
        a.set_id(query.id());
        if (!underlyingAnswer.SerializeToString(a.mutable_data()))
        {
            throw MessageProcessingError("Failed to prepare answer: " +
                underlyingAnswer.ShortDebugString());
        }
        return a;
    }
private:
    virtual ProtoAnswerT doProcessing(const ProtoQueryT & query) = 0;
};
Ну и напоследок приведу пример использования данного диспетчера, допустим у нас есть два вида сообщений:
package sample.proto;
message ServerStatusQuery {
}
message ServerStatusAnswer {
    optional int32 threadCount = 1;
    repeated string listeners = 2;
}
package sample.proto;
message EchoQuery {
    required string msg = 1;
}
message EchoAnswer {
    required string echo = 1;
}
#ifndef SERVERSTATUSMESSAGEPROCESSOR_H
#define SERVERSTATUSMESSAGEPROCESSOR_H
#include "MessageDispatcher.h"
#include "ServerStatus.pb.h"
class ServerStatusMessageProcessor:
        public ProtoMessageProcessor<sample::proto::ServerStatusQuery, sample::proto::ServerStatusAnswer>
{
public:
    typedef sample::proto::ServerStatusQuery query_type;
    typedef sample::proto::ServerStatusAnswer answer_type;
    ServerStatusMessageProcessor(MessageDispatcher * dispatcher);
    virtual std::string id() const;
private:
    MessageDispatcher * mDispatcher;
    virtual answer_type doProcessing(const query_type & query);
};
#endif // SERVERSTATUSMESSAGEPROCESSOR_H
#include "ServerStatusMessageProcessor.h"
using namespace sample::proto;
ServerStatusMessageProcessor::ServerStatusMessageProcessor(MessageDispatcher * dispatcher)
    : mDispatcher(dispatcher)
{
}
std::string ServerStatusMessageProcessor::id() const
{
    return "ServerStatus";
}
ServerStatusAnswer ServerStatusMessageProcessor::doProcessing(const ServerStatusQuery & query)
{
    ServerStatusAnswer s;
    s.set_threadcount(10);
    typedef MessageDispatcher::DispatcherImplType::const_iterator md_iterator;
    const MessageDispatcher::DispatcherImplType & mdImpl = mDispatcher->impl();
    for (md_iterator it = mdImpl.begin(); it != mdImpl.end(); ++it)
    {
        s.add_listeners(it->first);
    }
    return s;
}
#include "MessageDispatcher.h"
#include "ServerStatusMessageProcessor.h"
#include "EchoMessageProcessor.h"
#include <iostream>
#include <boost/smart_ptr/make_shared.hpp>
using namespace sample::proto;
int main()
{
    try
    {
        MessageDispatcher md;
        md.addProcessor(boost::make_shared<ServerStatusMessageProcessor>(&md));
        md.addProcessor(boost::make_shared<EchoMessageProcessor>());
        Message q;
        q.set_id("ServerStatus");
        Message ans = md.dispatch(q);
        std::cout << "query:  " << q.DebugString() << std::endl;
        std::cout << "answer: " << ans.DebugString() << std::endl;
    }
    catch (const std::exception & e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}
P.S. Для написания данной статьи использовались:
- gcc-4.4.5-linux
- cmake-2.8.2
- boost-1.42
- protobuf-2.3.0
Пример выложен на github
    
      24.02.2012 19:23+0400
    
        
            
            
            
        