Яндекс.Метрика

Песочница

Диспетчер произвольных сообщений на базе google protocol buffers

Появился свободный день, и я решил поиграться с библиотекой google::protobuf. Данная библиотека предоставляет возможность кодирования и декодирования структурированных данных. На базе этой библиотеки я построю простенький диспетчер, который может обрабатывать любые сообщения. Необычность данного диспетчера состоит в том, что он не будет знать типы передаваемых сообщений, и будет обрабатывать сообщения только с помощью зарегистрированных обработчиков.

Краткое описание библиотеки protobuf


Итак, сначала вкратце рассмотрим библиотеку google::protobuf, она поставляется в виде двух компонент:
собственно, сама библиотека + заголовочные файлы
компилятор файлов *.proto — генерирует из описания сообщения C++ класс (также есть возможность генерации для других языков программирования: Java, Python и т.д.)
В отдельном файле создается описание сообщения, из которого будет сгенерирован класс, синтаксис очень простой:
package sample.proto;

message ServerStatusAnswer {
    optional int32 threadCount = 1;
    repeated string listeners = 2;
}
Здесь мы описываем сообщение ServerStatusAnswer, которое имеет два необязательных поля:
  • threadCount — необязательный целочисленный параметр
  • listeners — необязательная строка, которая может несколько раз повторяться
Данному описанию удовлетворяет, например, следующее сообщение:
ServerStatusAnswer {
    threadCount = 3
    listeners = {
        "one",
        "two"
    }
}
На самом деле формат protobuf — бинарный, здесь я привел сообщение в читаемом формате только для удобства восприятия

Компилятор автоматически генерирует C++ код для сериализации и десериализации подобных сообщений. Библиотека protobuf также предоставляет дополнительные возможности: сериализация в файл, в поток, в буфер.

Я использую CMake в качестве системы сборки, и в нем уже есть поддержка protobuf:
cmake_minimum_required(VERSION 2.8)

project(ProtobufTests)

find_package(Protobuf REQUIRED)
include_directories(${PROTOBUF_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
#...
set (ProtobufTestsProtoSources
    Message.proto
    ServerStatus.proto
    Echo.proto
)
#...
PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${ProtobufTestsProtoSources})
add_executable(ProtobufTests ${ProtobufTestsSources} ${PROTO_SRCS} ${PROTO_HDRS})

target_link_libraries(ProtobufTests
    #...
    ${PROTOBUF_LIBRARY}
)
PROTOBUF_GENERATE_CPP — данный макрос вызывает компилятор protoc для каждого *.proto файла, и генерирует соответствующие cpp и h файлы, которые добавляются к сборке.
Все делается автоматически, и никаких дополнительных приседаний делать не надо (Под *nix может понадобиться дополнительный пакет Threads и соответствующий флаг линковщику).

Описание диспетчера


Я решил попробовать написать диспетчер сообщений, который принимает какое-то сообщение, вызывает соответствующий обработчик и отправляет ответ на полученное сообщение. При этом диспетчер не должен знать типы передаваемых ему сообщений. Это может быть необходимо в случае, если диспетчер добавляет или удаляет соответствующие обработчики в процессе работы (например, подгрузив соответствующий модель расширения, *.dll, *.so).

Для того чтобы обрабатывать произвольные сообщения, у нас должен быть класс, который обрабатывает абстрактное сообщение. Очевидно, если у нас будут описания сообщений в *.proto файле, то компилятор нам сгенерирует соответствующие классы, но к сожалению все они будут наследованы от google::protobuf::Message. У данного класса проблематично вытащить все данные из сообщения (сделать это в принципе можно, но тогда мы будем делать кучу лишней работы), к тому же мы не будем знать, как нам сформировать ответ.
На помощь приходит высказывание: «Любую проблему можно решить путём введения дополнительного уровня абстракции, кроме проблемы слишком большого количества уровней абстракции».
Нам надо отделить определение типа сообщения от самого сообщения, мы это можем сделать следующим способом:
package sample.proto;

message Message {
    required string id = 1;
    optional bytes data = 2;
}
Мы запакуем наше сообщение внутрь еще одного сообщения:
  • обязательное поле id содержит уникальный идентификатор сообщения
  • необязательное поле data содержит наше сообщение
Таким образом, наш диспетчер будет по полю id искать соответствующий обработчик сообщения:
#ifndef MESSAGEDISPATCHER_H
#define MESSAGEDISPATCHER_H

#include <map>
#include <stdexcept>

#include <boost/noncopyable.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>

#include "message.pb.h"

class MessageProcessingError: public std::runtime_error
{
public:
    MessageProcessingError(const std::string & e): std::runtime_error(e)
    {
    }
};

class MessageProcessorBase: private boost::noncopyable
{
public:

    virtual ~MessageProcessorBase()
    {
    }

    virtual std::string id() const = 0;

    virtual sample::proto::Message process(const sample::proto::Message & query) = 0;
};

typedef boost::shared_ptr<MessageProcessorBase> MessageProcessorBasePtr;

class MessageDispatcher
{
public:
    MessageDispatcher();

    void addProcessor(MessageProcessorBasePtr processor);

    sample::proto::Message dispatch(const sample::proto::Message & query);

    typedef std::map<std::string, MessageProcessorBasePtr> DispatcherImplType;

    const DispatcherImplType & impl() const;

private:
    DispatcherImplType mImpl;
};

#endif // MESSAGEDISPATCHER_H
Но теперь мы получаем, что каждый обработчик должен проводить распаковку сообщения sample::proto::Message в свое собственное сообщение. А этот процесс будет дублироваться для каждого такого обработчика. Мы хотим избежать дублирования кода, поэтому возьмем паттерн Type Erasure. Данный паттерн позволяет скрыть тип обрабатываемой сущности за общим интерфейсом, однако каждый обработчик будет работать с конкретным типом, известным только ему.

Итак, реализация очень проста:
template <typename ProtoQueryT, typename ProtoAnswerT>
class ProtoMessageProcessor: public MessageProcessorBase
{
public:
    virtual sample::proto::Message process(const sample::proto::Message & query)
    {
        ProtoQueryT underlyingQuery;
        if (!underlyingQuery.ParseFromString(query.data()))
        {
            throw MessageProcessingError("Failed to parse query: " +
                query.ShortDebugString());
        }

        ProtoAnswerT underlyingAnswer = doProcessing(underlyingQuery);

        sample::proto::Message a;
        a.set_id(query.id());

        if (!underlyingAnswer.SerializeToString(a.mutable_data()))
        {
            throw MessageProcessingError("Failed to prepare answer: " +
                underlyingAnswer.ShortDebugString());
        }
        return a;
    }

private:
    virtual ProtoAnswerT doProcessing(const ProtoQueryT & query) = 0;
};
Мы определяем виртуальную функцию process, но также добавляем виртуальную функцию doProcess, которая уже работает с нашими конкретными сообщениями! Данный прием основан на механизме инстанцирования шаблонов: типы подставляются в момент реального использования шаблона, а не в момент декларации. А так как данный класс наследуется от MessageProcessorBase, то мы смело можем передавать наследников данного класса в наш диспетчер. Также необходимо заметить, что данный класс осуществляет сериализацию и десериализацию наших конкретных сообщений и кидает исключения в случае возникновения ошибок.

Ну и напоследок приведу пример использования данного диспетчера, допустим у нас есть два вида сообщений:
package sample.proto;

message ServerStatusQuery {
}

message ServerStatusAnswer {
    optional int32 threadCount = 1;
    repeated string listeners = 2;
}

package sample.proto;

message EchoQuery {
    required string msg = 1;
}

message EchoAnswer {
    required string echo = 1;
}
Как видно из описания — данные сообщения запрашивают у сервера его внутреннее состояние (ServerStatus), и просто возвращает полученный запрос (Echo). Реализация самих обработчиков тривиальна, я приведу реализацию только ServerStatus:
#ifndef SERVERSTATUSMESSAGEPROCESSOR_H
#define SERVERSTATUSMESSAGEPROCESSOR_H

#include "MessageDispatcher.h"

#include "ServerStatus.pb.h"

class ServerStatusMessageProcessor:
        public ProtoMessageProcessor<sample::proto::ServerStatusQuery, sample::proto::ServerStatusAnswer>
{
public:

    typedef sample::proto::ServerStatusQuery query_type;
    typedef sample::proto::ServerStatusAnswer answer_type;

    ServerStatusMessageProcessor(MessageDispatcher * dispatcher);

    virtual std::string id() const;

private:
    MessageDispatcher * mDispatcher;

    virtual answer_type doProcessing(const query_type & query);
};

#endif // SERVERSTATUSMESSAGEPROCESSOR_H
Сама реализация:
#include "ServerStatusMessageProcessor.h"

using namespace sample::proto;

ServerStatusMessageProcessor::ServerStatusMessageProcessor(MessageDispatcher * dispatcher)
    : mDispatcher(dispatcher)
{
}

std::string ServerStatusMessageProcessor::id() const
{
    return "ServerStatus";
}

ServerStatusAnswer ServerStatusMessageProcessor::doProcessing(const ServerStatusQuery & query)
{
    ServerStatusAnswer s;

    s.set_threadcount(10);

    typedef MessageDispatcher::DispatcherImplType::const_iterator md_iterator;

    const MessageDispatcher::DispatcherImplType & mdImpl = mDispatcher->impl();

    for (md_iterator it = mdImpl.begin(); it != mdImpl.end(); ++it)
    {
        s.add_listeners(it->first);
    }

    return s;
}
Вот как это работает:
#include "MessageDispatcher.h"

#include "ServerStatusMessageProcessor.h"
#include "EchoMessageProcessor.h"

#include <iostream>
#include <boost/smart_ptr/make_shared.hpp>

using namespace sample::proto;

int main()
{
    try
    {
        MessageDispatcher md;

        md.addProcessor(boost::make_shared<ServerStatusMessageProcessor>(&md));
        md.addProcessor(boost::make_shared<EchoMessageProcessor>());

        Message q;
        q.set_id("ServerStatus");
        Message ans = md.dispatch(q);

        std::cout << "query:  " << q.DebugString() << std::endl;
        std::cout << "answer: " << ans.DebugString() << std::endl;
    }
    catch (const std::exception & e)
    {
        std::cerr << e.what() << std::endl;
    }

    return 0;
}

P.S. Для написания данной статьи использовались:
  • gcc-4.4.5-linux
  • cmake-2.8.2
  • boost-1.42
  • protobuf-2.3.0

Пример выложен на github