由gh-md-toc创建
什么是SObjectizer?
SObjectizer是少数几个跨平台和开源的C++ "actor框架"之一。但SObjectizer不仅支持Actor模型,还支持发布-订阅模型和类CSP通道。SObjectizer的目标是显著简化C++中并发和多线程应用程序的开发。
SObjectizer允许将并发应用程序创建为一组通过异步消息相互交互的agent对象。它处理消息分发并为消息处理提供工作上下文。并通过提供各种现成的调度器来允许调整这些内容。
SObjectizer有什么特点?
成熟度。SObjectizer基于1995-2000年提出的想法。SObjectizer本身从2002年就开始开发。SObjectizer-5自2010年以来一直在持续发展。
稳定性。从一开始SObjectizer就被用于业务关键型应用程序,其中一些仍在生产中使用。SObjectizer中的重大变更很少,我们对它们非常谨慎。
跨平台。SObjectizer可在Windows、Linux、FreeBSD、macOS和Android上运行。
易用性。SObjectizer提供易于理解和使用的API,在SObjectizer的发行版中有大量示例,项目的Wiki中也有大量信息。
免费。SObjectizer根据BSD-3-CLAUSE许可证分发,因此可以免费用于开发专有商业软件。
SObjectizer不同于TBB、taskflow或HPX
SObjectizer经常被与Intel Threading Building Blocks、taskflow、HPX等类似工具进行比较。这种比较是毫无意义的。
所有这些工具都旨在用于解决并行计算领域的任务:它们允许通过利用多个CPU核心来减少计算时间。例如,您可以在一个CPU核心上花一个小时将视频文件从一种格式重新编码为另一种格式,但在四个核心上只需15分钟。这就是并行计算的主要目标。
SObjectizer用于略有不同的领域:并发计算。SObjectizer的主要目标是简化同时完成多项不同任务的过程。有时不需要使用一个以上的CPU核心来完成此操作。但如果有多个CPU核心,那么SObjectizer会使这些任务的处理和它们之间的交互变得更加容易。
棘手的部分是并行计算和并发计算在底层使用相同的并发机制和原语(如线程、互斥锁、原子操作等)。但从高层次的角度来看,并行计算和并发计算用于非常不同的任务。
作为使用SObjectizer实现或可以实现的应用程序示例,我们可以列出多线程代理服务器、自动控制系统、MQ代理、数据库服务器等。
展示代码!
HelloWorld示例
这是使用SObjectizer的agent表达的经典"Hello, World"示例:
#include <so_5/all.hpp>
class hello_actor final : public so_5::agent_t {
public:
using so_5::agent_t::agent_t;
void so_evt_start() override {
std::cout << "Hello, World!" << std::endl;
// 结束示例的工作。
so_deregister_agent_coop_normally();
}
};
int main() {
// 启动SObjectizer。
so_5::launch([](so_5::environment_t & env) {
// 在新的cooperation中添加一个hello_actor实例。
env.register_agent_as_coop( env.make_agent<hello_actor>() );
});
return 0;
}
乒乓示例
让我们看一个更有趣的例子,包含两个agent和它们之间的消息交换。这是actor框架的另一个著名示例,"乒乓":
#include <so_5/all.hpp>
struct ping {
int counter_;
};
struct pong {
int counter_;
};
class pinger final : public so_5::agent_t {
so_5::mbox_t ponger_;
void on_pong(mhood_t<pong> cmd) {
if(cmd->counter_ > 0)
so_5::send<ping>(ponger_, cmd->counter_ - 1);
else
so_deregister_agent_coop_normally();
}
public:
pinger(context_t ctx) : so_5::agent_t{std::move(ctx)} {}
void set_ponger(const so_5::mbox_t mbox) { ponger_ = mbox; }
void so_define_agent() override {
so_subscribe_self().event( &pinger::on_pong );
}
void so_evt_start() override {
so_5::send<ping>(ponger_, 1000);
}
};
class ponger final : public so_5::agent_t {
const so_5::mbox_t pinger_;
int pings_received_{};
public:
ponger(context_t ctx, so_5::mbox_t pinger)
: so_5::agent_t{std::move(ctx)}
, pinger_{std::move(pinger)}
{}
void so_define_agent() override {
so_subscribe_self().event(
[this](mhood_t<ping> cmd) {
++pings_received_;
so_5::send<pong>(pinger_, cmd->counter_);
});
}
void so_evt_finish() override {
std::cout << "pings received: " << pings_received_ << std::endl;
}
};
int main() {
so_5::launch([](so_5::environment_t & env) {
env.introduce_coop([](so_5::coop_t & coop) {
auto pinger_actor = coop.make_agent<pinger>();
auto ponger_actor = coop.make_agent<ponger>(
pinger_actor->so_direct_mbox());
pinger_actor->set_ponger(ponger_actor->so_direct_mbox());
});
});
return 0;
}
上面代码中的所有agent都在同一个工作线程上工作。如何将它们绑定到不同的工作线程?
这非常简单。只需使用适当的调度器:
int main() {
so_5::launch([](so_5::environment_t & env) {
env.introduce_coop(
so_5::disp::active_obj::make_dispatcher(env).binder(),
[](so_5::coop_t & coop) {
auto pinger_actor = coop.make_agent<pinger>();
auto ponger_actor = coop.make_agent<ponger>(
pinger_actor->so_direct_mbox());
pinger_actor->set_ponger(ponger_actor->so_direct_mbox());
});
});
return 0;
}
发布/订阅示例
SObjectizer通过多生产者/多消费者消息框支持发布/订阅模型。发送到该消息框的消息将被该消息类型的所有订阅者接收:
#include <so_5/all.hpp>
using namespace std::literals;
struct acquired_value {
std::chrono::steady_clock::time_point acquired_at_;
int value_;
};
class producer final : public so_5::agent_t {
const so_5::mbox_t board_;
so_5::timer_id_t timer_;
int counter_{};
struct acquisition_time final : public so_5::signal_t {};
void on_timer(mhood_t<acquisition_time>) {
// 为所有消费者发布下一个值。
so_5::send<acquired_value>(
board_, std::chrono::steady_clock::now(), ++counter_);
}
public:
producer(context_t ctx, so_5::mbox_t board)
: so_5::agent_t{std::move(ctx)}
, board_{std::move(board)}
{}
void so_define_agent() override {
so_subscribe_self().event(&producer::on_timer);
}
void so_evt_start() override {
// Agent将定期接收acquisition_time信号
// 没有初始延迟,周期为750ms。
timer_ = so_5::send_periodic<acquisition_time>(*this, 0ms, 750ms);
}
};
class consumer final : public so_5::agent_t {
const so_5::mbox_t board_;
const std::string name_;
void on_value(mhood_t<acquired_value> cmd) {
std::cout << name_ << ": " << cmd->value_ << std::endl;
}
public:
consumer(context_t ctx, so_5::mbox_t board, std::string name)
: so_5::agent_t{std::move(ctx)}
, board_{std::move(board)}
, name_{std::move(name)}
{}
void so_define_agent() override {
so_subscribe(board_).event(&consumer::on_value);
}
};
int main() {
so_5::launch([](so_5::environment_t & env) {
auto board = env.create_mbox();
env.introduce_coop([board](so_5::coop_t & coop) {
coop.make_agent<producer>(board);
coop.make_agent<consumer>(board, "first"s);
coop.make_agent<consumer>(board, "second"s);
});
std::this_thread::sleep_for(std::chrono::seconds(4));
env.stop();
});
return 0;
}
闪烁LED示例
SObjectizer中的所有agent都是有限状态机。支持几乎所有分层有限状态机(HSM)的功能:子状态和处理程序继承、on_enter/on_exit处理程序、状态超时、深浅状态历史,除了正交状态。
让我们看看实现以下状态图的agent可能是什么样子:
这是一个非常简单的示例,演示了上面所示状态图的agent:
#include <so_5/all.hpp>
using namespace std::literals;
class blinking_led final : public so_5::agent_t {
state_t off{ this }, blinking{ this },
blink_on{ initial_substate_of{ blinking } },
blink_off{ substate_of{ blinking } };
public :
struct turn_on_off : public so_5::signal_t {};
blinking_led(context_t ctx) : so_5::agent_t{std::move(ctx)} {
this >>= off;
off.just_switch_to<turn_on_off>(blinking);
blinking.just_switch_to<turn_on_off>(off);
blink_on
.on_enter([]{ std::cout << "ON" << std::endl; })
.on_exit([]{ std::cout << "off" << std::endl; })
.time_limit(1250ms, blink_off);
blink_off
.time_limit(750ms, blink_on);
}
};
int main()
{
so_5::launch([](so_5::environment_t & env) {
so_5::mbox_t m;
env.introduce_coop([&](so_5::coop_t & coop) {
auto led = coop.make_agent< blinking_led >();
m = led->so_direct_mbox();
});
const auto pause = [](auto duration) {
std::this_thread::sleep_for(duration);
};
std::cout << "打开闪烁10秒" << std::endl;
so_5::send<blinking_led::turn_on_off>(m);
pause(10s);
std::cout << "关闭闪烁5秒" << std::endl;
so_5::send<blinking_led::turn_on_off>(m);
pause(5s);
std::cout << "打开闪烁5秒" << std::endl;
so_5::send<blinking_led::turn_on_off>(m);
pause(5s);
std::cout << "正在停止..." << std::endl;
env.stop();
} );
return 0;
}
类似CSP的乒乓球示例
SObjectizer允许即使在没有代理的情况下也能编写并发应用程序。 只能使用普通线程和类似CSP的通道。
这是Ping-Pong示例的普通线程实现(请注意 main()不是异常安全的):
#include <so_5/all.hpp>
struct ping {
int counter_;
};
struct pong {
int counter_;
};
void pinger_proc(so_5::mchain_t self_ch, so_5::mchain_t ping_ch) {
so_5::send<ping>(ping_ch, 1000);
// 读取所有消息直到通道关闭。
so_5::receive( so_5::from(self_ch).handle_all(),
[&](so_5::mhood_t<pong> cmd) {
if(cmd->counter_ > 0)
so_5::send<ping>(ping_ch, cmd->counter_ - 1);
else {
// 必须关闭通道以中断`receive`调用。
so_5::close_drop_content(so_5::exceptions_enabled, self_ch);
so_5::close_drop_content(so_5::exceptions_enabled, ping_ch);
}
});
}
void ponger_proc(so_5::mchain_t self_ch, so_5::mchain_t pong_ch) {
int pings_received{};
// 读取所有消息直到通道关闭。
so_5::receive( so_5::from(self_ch).handle_all(),
[&](so_5::mhood_t<ping> cmd) {
++pings_received;
so_5::send<pong>(pong_ch, cmd->counter_);
});
std::cout << "收到的ping数: " << pings_received << std::endl;
}
int main() {
so_5::wrapped_env_t sobj;
auto pinger_ch = so_5::create_mchain(sobj);
auto ponger_ch = so_5::create_mchain(sobj);
std::thread pinger{pinger_proc, pinger_ch, ponger_ch};
std::thread ponger{ponger_proc, ponger_ch, pinger_ch};
ponger.join();
pinger.join();
return 0;
}
另一个带有类似Golang的select()语句的CSP示例
SObjectizer提供了一个类似于Golang的select语句的select()函数。该函数允许等待来自多个消息链的传入消息。它还允许等待消息链准备好接受新的传出消息。因此,select()允许在目标消息链已满的情况下进行非阻塞send()调用并处理传入消息。
这里有一个使用select()作为背压机制的斐波那契计算示例(如果数字读取线程尚未读取前一个数字,则数字生成线程将等待)。还要注意,此示例中的main()函数是异常安全的。
#include <so_5/all.hpp>
#include <chrono>
using namespace std;
using namespace std::chrono_literals;
using namespace so_5;
struct quit {};
void fibonacci( mchain_t values_ch, mchain_t quit_ch )
{
int x = 0, y = 1;
mchain_select_result_t r;
do
{
r = select(
from_all().handle_n(1),
// 当values_ch准备好接受新的传出消息时,
// 发送一个包含值'x'的新'int'类型消息。
send_case( values_ch, message_holder_t<int>::make(x),
[&x, &y] { // 此代码块将在send()之后被调用。
auto old_x = x;
x = y; y = old_x + y;
} ),
// 如果quit_ch中有'quit'消息,则接收它。
receive_case( quit_ch, [](quit){} ) );
}
// 只要我们发送了某些内容且没有接收到任何内容,就继续循环。
while( r.was_sent() && !r.was_handled() );
}
int main()
{
wrapped_env_t sobj;
thread fibonacci_thr;
auto thr_joiner = auto_join( fibonacci_thr );
// 斐波那契数的链将具有有限容量。
auto values_ch = create_mchain( sobj, 1s, 1,
mchain_props::memory_usage_t::preallocated,
mchain_props::overflow_reaction_t::abort_app );
auto quit_ch = create_mchain( sobj );
auto ch_closer = auto_close_drop_content( values_ch, quit_ch );
fibonacci_thr = thread{ fibonacci, values_ch, quit_ch };
// 从values_ch读取前10个数字。
receive( from( values_ch ).handle_n( 10 ),
// 并将每个数字显示到标准输出。
[]( int v ) { cout << v << endl; } );
send< quit >( quit_ch );
}
想了解更多?
有关SObjectizer的更多信息可以在项目Wiki的相应部分找到。
伴随项目so5extra中还有更多有用的东西
有一个单独的伴随项目so5extra,其中包含许多各种有用的东西,如基于Asio的调度器、其他类型的mbox、可撤销定时器、同步请求等。
例如,以下是同步交互的样子(使用so_5::extra::sync
):
#include <so_5_extra/sync/pub.hpp>
#include <so_5/all.hpp>
// 为方便起见使用短别名。
namespace sync_ns = so_5::extra::sync;
using namespace std::chrono_literals;
// 服务提供者的类型。
class service_provider_t final : public so_5::agent_t
{
public :
using so_5::agent_t::agent_t;
void so_define_agent() override
{
so_subscribe_self().event(
[]( sync_ns::request_mhood_t<int, std::string> cmd ) {
// 转换传入值,将结果转换为字符串并发送回去。
cmd->make_reply( std::to_string(cmd->request() * 2) );
} );
}
};
// 服务消费者的类型。
class consumer_t final : public so_5::agent_t
{
// 服务提供者的消息框。
const so_5::mbox_t m_service;
public :
consumer_t( context_t ctx, so_5::mbox_t service )
: so_5::agent_t{ std::move(ctx) }
, m_service{ std::move(service) }
{}
void so_evt_start() override
{
// 发出请求并等待结果,最多等待500ms。
auto result = sync_ns::request_reply<int, std::string>(
// 请求的目的地。
m_service,
// 最长等待时间。
500ms,
// 请求值。
4 );
std::cout << "结果: " << result << std::endl;
so_deregister_agent_coop_normally();
}
};
int main()
{
so_5::launch( [](so_5::environment_t & env) {
env.introduce_coop(
// 每个代理应该在自己的线程上工作。
so_5::disp::active_obj::make_dispatcher( env ).binder(),
[](so_5::coop_t & coop) {
auto service_mbox = coop.make_agent< service_provider_t >()
->so_direct_mbox();
coop.make_agent< consumer_t >( service_mbox );
} );
} );
}
SObjectizer本身旨在成为一个相对较小且没有外部依赖的项目。so5extra没有这个限制。这就是为什么基于Asio的调度器和环境基础设施在so5extra中实现,而不是在SObjectizer中。
SObjectizer的另一个重要特性是稳定性。我们试图使SObjectizer尽可能保持稳定,但也需要尝试一些新功能,即使我们还不知道它们会有多成功和需求。so5extra是尝试新功能的好地方,其中一些功能随着时间的推移可能会被移至SObjectizer。
因此,如果您在SObjectizer中找不到有用的功能,不妨看看so5extra。也许它已经在那里了。
局限性
SObjectizer是一个进程内消息分发框架。它不直接支持分布式应用程序。但在这种情况下可以使用外部工具和库。请看一下我们的mosquitto_transport实验: https://github.com/Stiffstream/mosquitto_transport
获取和构建
可以从GitHub检出SObjectizer。包含SObjectizer源代码的存档可以从GitHub或SourceForge下载。
有两种构建SObjectizer的方法。 第一种是使用Mxx_ru工具。第二种是使用CMake。
注意。从v.5.5.15.2版本开始,支持Android平台。只能通过CMake为Android构建。请参阅下面的相应部分。
SObjectizer还可以通过vcpkg和Conan依赖管理器安装和使用。请参阅下面的相应部分。
SObjectizer-5.8需要C++17!
SObjectizer的5.8分支需要C++17。
如果您需要支持C++14或C++11,请尝试查看SourceForge上的旧版本SObjectizer。或联系stiffstream讨论将SObjectizer-5.8移植到较旧的C++标准。
通过Mxx_ru构建
注意。这是构建SObjectizer的标准方法。这种方法用于SObjectizer的开发过程中。
要构建SObjectizer,需要使用Ruby语言和Mxx_ru工具。 安装Ruby,然后通过RubyGems命令安装Mxx_ru:
gem install Mxx_ru
如果您已经安装了Mxx_ru,请更新到至少1.6.14.6版本:
gem update Mxx_ru
可以从GitHub上的Git仓库获取SObjectizer:
git clone https://github.com/stiffstream/sobjectizer
要构建 SObjectizer:
cd sobjectizer/dev
ruby build.rb
SObjectizer 的静态和共享库将被构建。库文件将被放置在 target/release 子目录中。
如果你只想构建共享库:
cd sobjectizer/dev
ruby so_5/prj.rb
或者如果你只想构建静态库:
cd sobjectizer/dev
ruby so_5/prj_s.rb
要构建 SObjectizer 以及所有测试和示例:
cd sobjectizer/dev
ruby build_all.rb
请注意,在 FreeBSD 下可能需要定义 LD_LIBRARY_PATH 环境变量。在 FreeBSD 下实际的构建命令序列可能如下:
cd sobjectizer/dev
export LD_LIBRARY_PATH=target/release
ruby build_all.rb
要为 SObjectizer 构建 html 格式的文档,需要使用 Doxygen 工具。如果已安装,则:
cd sobjectizer/doxygen
doxygen
生成的 html 文件将位于 sobjectizer/dev/doc/html。
注意。如果你没有自己指定 MXX_RU_CPP_TOOLSET,那么 Mxx_ru 将尝试自动检测你的 C++ 工具集。如果你想使用非系统默认的 C++ 编译器,请手动定义 MXX_RU_CPP_TOOLSET 环境变量。它可能看起来像:
export MXX_RU_CPP_TOOLSET="clang_linux compiler_name=clang++-6 linker_name=clang++-6"
关于根据你的需求调整 Mxx_ru 的更多信息,你可以在相应文档中找到。
通过 CMake 构建
要通过 CMake 构建 SObjectizer,需要安装 CMake 并了解如何使用它。以下操作只是一个演示。有关 SObjectizer 的 cmake 构建系统的更多详细信息,请参阅 dev/cmake/CmakeQuickHowto.txt
要在 Linux/FreeBSD 下通过命令行获取和构建 SObjectizer,请运行:
git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release ../dev
cmake --build . --config Release
cmake --build . --config Release --target install
这些命令将创建所有必要的 Makefile,然后构建 SObjectizer。如果还需要构建示例和测试,请使用
cmake -DBUILD_ALL=ON -DCMAKE_INSTALL_PREFIX=target ../dev
当 'make install' 完成后,'./target' 将包含两个子文件夹:'./bin' 包含示例,'./lib' 包含共享库 libso.5.x.x.so
CMake 构建系统目前支持以下选项:
SOBJECTIZER_BUILD_STATIC
. 启用将 SObjectizer 构建为静态库 [默认: ON]SOBJECTIZER_BUILD_SHARED
. 启用将 SObjectizer 构建为共享库 [默认: ON]BUILD_ALL
. 启用构建示例和测试 [默认: OFF]BUILD_EXAMPLES
. 启用构建示例 [默认: OFF]BUILD_TESTS
. 启用构建测试 [默认: OFF]
请注意,如果 BUILD_ALL
或 BUILD_EXAMPLES
或 BUILD_TESTS
被打开,那么 SOBJECTIZER_BUILD_STATIC
和 SOBJECTIZER_BUILD_SHARED
都必须打开。这意味着如果 SOBJECTIZER_BUILD_STATIC
或 SOBJECTIZER_BUILD_SHARED
被关闭,那么 BUILD_ALL
/BUILD_EXAMPLES
/BUILD_TESTS
都必须关闭。
要在 Windows 下通过 MS Visual Studio 2013 从命令行构建 SObjectizer:
git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release -G "Visual Studio 15 2017" ../dev
cmake --build . --config Release
cmake --build . --config Release --target install
如果还需要构建示例,请在 cmake 调用中使用 BUILD_ALL
:
cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release -DBUILD_ALL=ON -G "Visual Studio 15 2017" ../dev
从 v.5.5.24 开始,SObjectizer 提供 sobjectizer-config.cmake 文件。
这些文件会自动安装到 <target>/lib/cmake/sobjectizer
子文件夹中。这允许通过 CMake 的 find_package 命令使用 SObjectizer。
为 Android 构建
可以通过相当新的 Android NDK 或 CrystaX NDK 为 Android 构建。
使用 Android NDK 构建
你需要在系统中安装 Android SDK 和 Android NDK。以及适当版本的 CMake。你还需要正确设置环境变量 ANDROID_HOME
, ANDROID_NDK
。然后你可以发出以下命令:
git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DBUILD_ALL -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release \
-DCMAKE_TOOLCHAIN_FILE=${ANDROID_NDK}/build/cmake/android.toolchain.cmake \
-G Ninja \
-DANDROID_ABI=arm64-v8a \
-DANDROID_NDK=${ANDROID_NDK} \
-DANDROID_NATIVE_API_LEVEL=23 \
-DANDROID_TOOLCHAIN=clang \
../dev
cmake --build . --config=Release
cmake --build . --config=Release --target install
使用 CrystaX NDK 构建
你需要在系统中已经安装了 CrystaX NDK v.10.4.0 或更高版本。CMake 用于构建 SObjectizer:
git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
export NDK=/path/to/the/crystax-ndk
cmake -DBUILD_ALL -DCMAKE_INSTALL_PREFIX=result -DCMAKE_TOOLCHAIN_FILE=$NDK/cmake/toolchain.cmake -DANDROID_ABI=arm64-v8a ../dev
make
make test
make install
使用 C++ 依赖管理器
通过 vcpkg 使用
要通过 vcpkg 使用 SObjectizer,需要执行以下步骤。
安装 sobjectizer
包:
vcpkg install sobjectizer
在你的 CMakeLists.txt 文件中添加以下行:
find_package(sobjectizer CONFIG REQUIRED)
target_link_libraries(your_target sobjectizer::SharedLib) # 或 sobjectizer::StaticLib
通过 Conan 使用
注意。 自 2021 年 2 月起,SObjectizer 的新版本仅通过 conan-center 提供。
安装 SObjectizer 并将其添加到 conanfile.txt
要通过 Conan 使用 SObjectizer,需要将 SObjectizer 添加到项目的 conanfile.txt
中:
[requires]
sobjectizer/5.8.0
可能还需要为 SObjectizer 指定 shared
选项。例如,将 SObjectizer 构建为静态库:
[options]
sobjectizer:shared=False
为你的项目安装依赖项:
conan install SOME_PATH --build=missing
将 SObjectizer 添加到你的 CMakeLists.txt
...
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()
...
target_link_libraries(your_target ${CONAN_LIBS})
许可证
SObjectizer 在 3 条款 BSD 许可下分发。有关许可信息,请参阅 LICENSE 文件。