info('接收数据 =>' . $data); if ($data != '') { $datadevice = $this->stringdata($data); Redis::set('dbtype',0); if(Redis::get('sbjc:' . $datadevice[0]) == ''){ Redis::set('sbjc:' . $datadevice[0], $data); $this->dbdevice($this->stringdata($data)); }else if (Redis::get('sbjc:' . $datadevice[0]) != $data){ $this->dbdevice($this->stringdata($data)); Redis::set('sbjc:' . $datadevice[0], $data); } Redis::lpush('police',$data); $this->saveInfluxDb($data); $this->police(); } $dalen = Redis::llen('Devicesdata');//返回队列长度 if ($dalen > 5000 ) { //当队列值达到巅峰以后切换数据库 Redis::select(1);//使用备用数据库//释放主数据库 Redis::set('dbtype',1); $this->deleteDbRedis(); } if($dalen > 5000 && Redis::get('dbtype') == 1){ Redis::select(0); Redis::set('dbtype',0); $this->deleteDb(); } } //分割字符串 private function stringdata($data = ''): array { if ($data != '') { return explode('/', $data); } } //查询redis所有的值 private function devicedata(): array { $keys = Redis::keys("*");//获取所有的键 $data = Redis::mget($keys);//获取所有的键的值 $arrdata = []; foreach ($data as $key => $vel) { $expdata = explode('/', $vel); array_push($arrdata, $expdata); } return $arrdata; } //清楚指定数据库 redis private function deleteDbRedis (){ Redis::select(0); Redis::flushdb(); Redis::select(1); } private function deleteDb(){ Redis::select(1); Redis::flushdb(); Redis::select(0); } //查询出返回值 private function dbdevice($data = []) { $Devices = new DevicesController(); if ($data != []) { $devicedata = DB::table('device as d') ->where('d.devicenum', '=', $data[0]) ->leftjoin('device_type as t', 't.tid', '=', 'd.dtype') ->leftjoin('gas as g', 'g.id', '=', 'd.status') ->leftjoin('danwei as c', 'c.id', '=', 'd.devicemonad') ->leftjoin('status as s', 's.id', '=', 'd.devicepolice') ->select('d.devicenum', 'd.username', 'd.deviceremark', 'd.devicelinkman', 'd.devicephone', 'd.deviceinfo', 't.tname', 'g.gas', 'c.danwei', 's.status_name') ->first(); if($devicedata != ''){ Redis::lPush('Devicesdata', serialize($devicedata));//进入队列进行 } } } //处理报警 private function police() { $davicedata = $this->stringdata(Redis::lpop('police')); // 处理发送数据报警 $isdevicenumm = DB::table('device') ->where('devicenum','=',$davicedata[0]) ->first(); if($isdevicenumm){ $isdavice = DB::table('reportpolice') ->where('devicenumber', '=', $davicedata[0]) ->select('status', 'endtime') ->first(); $type = json_decode(json_encode($isdavice), true); //设备再次报警 if ($type['endtime'] != '' && $type['status'] == 2 && $davicedata[1] != 1){ $up = DB::table('reportpolice') ->where('devicenumber', '=', $davicedata[0]) ->update(['endtime'=>'', 'police' => 1,'starttime'=>time(),'status' => 1,'policestatus'=> $davicedata[1],'concentration' => $davicedata[2]]); } if($type['status'] == 1 && $type['endtime'] == '' && $davicedata[1] == 1){ $up = DB::table('reportpolice') ->where('devicenumber', '=', $davicedata[0]) ->update(['endtime'=>time(),'status' => 2]); } if ($isdavice) { //改成报警状态 if ($davicedata[1] != 1 && $type['endtime'] == '') { $up = DB::table('reportpolice') ->where('devicenumber', '=', $davicedata[0]) ->update(['concentration' => $davicedata[2], 'policestatus' => $davicedata[1], 'police' => 1, 'status' => 1,'starttime'=>time()]); } } else { if ($davicedata[1] != 1) { $add = DB::table('reportpolice') ->insertGetId([ 'devicenumber' => $davicedata[0], 'starttime' => time(), 'concentration' => $davicedata[2], 'policestatus' => $davicedata[1], 'status' => 1, 'location' => $isdevicenumm->deviceinfo, ]); } } $updavice = DB::table('device') ->where('devicenum', '=', $davicedata[0]) ->update(['devicepolice' => $davicedata[1], 'nd' => $davicedata[2], 'update_time' => time()]); } } /** * 保存到时序数据库influxdb * @param $data * @throws \InfluxDB\Database\Exception * @throws \InfluxDB\Exception */ private function saveInfluxDb($data) { $data_array = explode('/', $data); $device_num = $data_array[0]; $device_status = $data_array[1]; $device_value = $data_array[2]; $influxDb = Config::get('database.influxdb'); $host = $influxDb['default']['host']; $port = $influxDb['default']['port']; $username = $influxDb['default']['username']; $password = $influxDb['default']['password']; $client = new \InfluxDB\Client($host, $port, $username, $password); $database = $client->selectDB($influxDb['default']['database']); $user_id = DB::table('device')->where('devicenum', $device_num)->value('uid'); if($user_id) { $points = [ new \InfluxDB\Point( 'devices', $device_value, ['device_num' => $device_num, 'device_status' => $device_status, 'user_id' => $user_id], [], time() ), ]; $database->writePoints($points, \InfluxDB\Database::PRECISION_SECONDS); } } }