You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shipin/crmeb/services/workerman/chat/ChatService.php

199 lines
6.6 KiB

1 year ago
<?php
// +----------------------------------------------------------------------
// | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
// +----------------------------------------------------------------------
// | Copyright (c) 2016~2023 https://www.crmeb.com All rights reserved.
// +----------------------------------------------------------------------
// | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
// +----------------------------------------------------------------------
// | Author: CRMEB Team <admin@crmeb.com>
// +----------------------------------------------------------------------
namespace crmeb\services\workerman\chat;
use app\services\kefu\service\StoreServiceRecordServices;
use app\services\kefu\service\StoreServiceServices;
use Channel\Client;
use crmeb\services\workerman\ChannelService;
use crmeb\services\workerman\Response;
use Workerman\Connection\TcpConnection;
use Workerman\Lib\Timer;
use Workerman\Worker;
class ChatService
{
/**
* @var Worker
*/
protected $worker;
/**
* @var TcpConnection[]
*/
protected $connections = [];
/**
* @var TcpConnection[]
*/
protected $user = [];
/**
* 在线客服
* @var TcpConnection[]
*/
protected $kefuUser = [];
/**
* @var ChatHandle
*/
protected $handle;
/**
* @var Response
*/
protected $response;
/**
* @var int
*/
protected $timer;
public function __construct(Worker $worker)
{
$this->worker = $worker;
$this->handle = new ChatHandle($this);
$this->response = new Response();
}
public function setUser(TcpConnection $connection)
{
$this->user[$connection->user->uid] = $connection;
}
/**
* 获得当前在线客服
* @return TcpConnection[]
*/
public function kefuUser()
{
return $this->kefuUser;
}
/**
* 设置当前在线客服
* @param TcpConnection $connection
*/
public function setKefuUser(TcpConnection $connection, bool $isUser = true)
{
$this->kefuUser[$connection->kefuUser->uid] = $connection;
if ($isUser) {
$this->user[$connection->user->uid] = $connection;
}
}
public function user($key = null)
{
return $key ? ($this->user[$key] ?? false) : $this->user;
}
public function onConnect(TcpConnection $connection)
{
$this->connections[$connection->id] = $connection;
$connection->lastMessageTime = time();
}
public function onMessage(TcpConnection $connection, $res)
{
$connection->lastMessageTime = time();
$res = json_decode($res, true);
if (!$res || !isset($res['type']) || !$res['type'] || $res['type'] == 'ping') {
return $this->response->connection($connection)->success('ping', ['now' => time()]);
}
if (!method_exists($this->handle, $res['type'])) return;
try {
$this->handle->{$res['type']}($connection, $res + ['data' => []], $this->response->connection($connection));
} catch (\Throwable $e) {
}
}
public function onWorkerStart(Worker $worker)
{
ChannelService::connet();
Client::on('crmeb_chat', function ($eventData) use ($worker) {
if (!isset($eventData['type']) || !$eventData['type']) return;
$ids = isset($eventData['ids']) && count($eventData['ids']) ? $eventData['ids'] : array_keys($this->user);
$fun = $eventData['fun'] ?? false;
foreach ($ids as $id) {
if (isset($this->user[$id])) {
if ($fun) {
$this->handle->{$eventData['type']}($this->user[$id], $eventData + ['data' => []], $this->response->connection($this->user[$id]));
} else {
$this->response->connection($this->user[$id])->success($eventData['type'], $eventData['data'] ?? null);
}
}
}
});
$this->timer = Timer::add(15, function () use (&$worker) {
$time_now = time();
foreach ($worker->connections as $connection) {
if ($time_now - $connection->lastMessageTime > 12) {
//定时器判断当前用户是否下线
if (isset($connection->user->uid) && !isset($connection->user->isTourist)) {
/** @var StoreServiceRecordServices $service */
$service = app()->make(StoreServiceRecordServices::class);
$service->updateRecord(['to_uid' => $connection->user->uid], ['online' => 0]);
}
$this->response->connection($connection)->close('timeout');
//广播给客服谁下线了
foreach ($this->kefuUser as $uid => &$conn) {
if (isset($connection->user->uid) && $connection->user->uid != $uid) {
if (isset($conn->onlineUids) && ($key = array_search($connection->user->uid, $conn->onlineUids)) !== false) {
unset($conn->onlineUids[$key]);
}
$this->response->connection($conn)->send('user_online', ['to_uid' => $connection->user->uid, 'online' => 0]);
}
}
}
}
});
Timer::add(2, function () use (&$worker) {
$uids = [];
foreach ($this->user() as $uid => $connection) {
if (!isset($connection->isTourist)) {
$uids[] = $uid;
}
}
if ($uids) {
//除了当前在线的其他全部都下线
/** @var StoreServiceRecordServices $service */
$service = app()->make(StoreServiceRecordServices::class);
$service->updateOnline(['notUid' => $uids], ['online' => 0]);
}
$kefuUid = array_keys($this->kefuUser());
if ($kefuUid) {
/** @var StoreServiceServices $kefuService */
$kefuService = app()->make(StoreServiceServices::class);
$kefuService->updateOnline(['notUid' => $kefuUid], ['online' => 0]);
}
});
}
public function onClose(TcpConnection $connection)
{
var_dump('close');
unset($this->connections[$connection->id]);
if (isset($connection->user->uid)) {
unset($this->user[$connection->user->uid]);
}
if (isset($connection->kefuUser->uid)) {
unset($this->kefuUser[$connection->kefuUser->uid]);
}
}
}