public function actionCallback(): void
{
$this->setHeaders();
self::listener();
}
private function setHeaders(): void
{
set_time_limit(0);
header('Content-Type: text/event-stream');
header('Connection: keep-alive');
header('Cache-Control: no-store');
echo 'retry: 10000' . PHP_EOL;
}
public static function listener($timeout = 0): void
{
try {
$context = self::getAmqpConnection();
$queue = $context->createQueue('queue');
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe(
$context->createConsumer($queue),
function (PsrMessage $message, PsrConsumer $consumer) {
try {
$msg = json_decode($message->getBody());
// сюда заходит
$id = time();
$msg = json_encode($msg, JSON_THROW_ON_ERROR);
self::sendMsg($id, $msg);
$consumer->acknowledge($message);
} catch (\Throwable $e) {
}
}
);
$subscriptionConsumer->consume($timeout);
} catch (\Throwable $e) {
}
}
private static function sendMsg(string $id, string $msg): void
{
echo "data: $msg" . PHP_EOL;
echo "id: $id" . PHP_EOL;
echo PHP_EOL;
ob_flush();
flush();
}
我想在 php 中实现服务器发送的事件 - https://bigboxcode.com/php-server-sent-events-sse
但是当你启动队列监听器时,它只是在一个无限循环中并且没有返回结果
是否可以从闭包订阅输出回声?

SSE 是一个标准,描述了如何从客户端建立第一个连接的那一刻开始向客户端发送数据。
而AMQP是一个异步协议,即 想象一下,您在一个完全不同的进程中运行回调函数,这会丢失您的“第一个”连接,而这正是向客户端发送事件所需要的。
通过 AMQP 总线 - 这是行不通的。