phpMQTT.php 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. <?php
  2. /* phpMQTT */
  3. class phpMQTT {
  4. private $socket; /* holds the socket */
  5. private $msgid = 1; /* counter for message id */
  6. public $keepalive = 10; /* default keepalive timmer */
  7. public $timesinceping; /* host unix time, used to detect disconects */
  8. public $topics = array(); /* used to store currently subscribed topics */
  9. public $debug = false; /* should output debug messages */
  10. public $address; /* broker address */
  11. public $port; /* broker port */
  12. public $clientid; /* client id sent to brocker */
  13. public $will; /* stores the will of the client */
  14. private $username; /* stores username */
  15. private $password; /* stores password */
  16. function __construct($address, $port, $clientid){
  17. $this->broker($address, $port, $clientid);
  18. }
  19. /* sets the broker details */
  20. function broker($address, $port, $clientid){
  21. $this->address = $address;
  22. $this->port = $port;
  23. $this->clientid = $clientid;
  24. }
  25. /* connects to the broker
  26. inputs: $clean: should the client send a clean session flag */
  27. function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){
  28. if($will) $this->will = $will;
  29. if($username) $this->username = $username;
  30. if($password) $this->password = $password;
  31. $address = gethostbyname($this->address);
  32. $this->socket = fsockopen($address, $this->port, $errno, $errstr, 60);
  33. if (!$this->socket ) {
  34. error_log("fsockopen() $errno, $errstr \n");
  35. return false;
  36. }
  37. stream_set_timeout($this->socket, 5);
  38. stream_set_blocking($this->socket, 0);
  39. $i = 0;
  40. $buffer = "";
  41. $buffer .= chr(0x00); $i++;
  42. $buffer .= chr(0x06); $i++;
  43. $buffer .= chr(0x4d); $i++;
  44. $buffer .= chr(0x51); $i++;
  45. $buffer .= chr(0x49); $i++;
  46. $buffer .= chr(0x73); $i++;
  47. $buffer .= chr(0x64); $i++;
  48. $buffer .= chr(0x70); $i++;
  49. $buffer .= chr(0x03); $i++;
  50. //No Will
  51. $var = 0;
  52. if($clean) $var+=2;
  53. //Add will info to header
  54. if($this->will != NULL){
  55. $var += 4; // Set will flag
  56. $var += ($this->will['qos'] << 3); //Set will qos
  57. if($this->will['retain']) $var += 32; //Set will retain
  58. }
  59. if($this->username != NULL) $var += 128; //Add username to header
  60. if($this->password != NULL) $var += 64; //Add password to header
  61. $buffer .= chr($var); $i++;
  62. //Keep alive
  63. $buffer .= chr($this->keepalive >> 8); $i++;
  64. $buffer .= chr($this->keepalive & 0xff); $i++;
  65. $buffer .= $this->strwritestring($this->clientid,$i);
  66. //Adding will to payload
  67. if($this->will != NULL){
  68. $buffer .= $this->strwritestring($this->will['topic'],$i);
  69. $buffer .= $this->strwritestring($this->will['content'],$i);
  70. }
  71. if($this->username) $buffer .= $this->strwritestring($this->username,$i);
  72. if($this->password) $buffer .= $this->strwritestring($this->password,$i);
  73. $head = " ";
  74. $head{0} = chr(0x10);
  75. $head{1} = chr($i);
  76. fwrite($this->socket, $head, 2);
  77. fwrite($this->socket, $buffer);
  78. $string = $this->read(4);
  79. if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){
  80. if($this->debug) echo "Connected to Broker\n";
  81. }else{
  82. error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n",
  83. ord($string{0}),ord($string{3})));
  84. return false;
  85. }
  86. $this->timesinceping = time();
  87. return true;
  88. }
  89. /* read: reads in so many bytes */
  90. function read($int = 8192, $nb = false){
  91. // print_r(socket_get_status($this->socket));
  92. $string="";
  93. $togo = $int;
  94. if($nb){
  95. return fread($this->socket, $togo);
  96. }
  97. while (!feof($this->socket) && $togo>0) {
  98. $fread = fread($this->socket, $togo);
  99. $string .= $fread;
  100. $togo = $int - strlen($string);
  101. }
  102. return $string;
  103. }
  104. /* subscribe: subscribes to topics */
  105. function subscribe($topics, $qos = 0){
  106. $i = 0;
  107. $buffer = "";
  108. $id = $this->msgid;
  109. $buffer .= chr($id >> 8); $i++;
  110. $buffer .= chr($id % 256); $i++;
  111. foreach($topics as $key => $topic){
  112. $buffer .= $this->strwritestring($key,$i);
  113. $buffer .= chr($topic["qos"]); $i++;
  114. $this->topics[$key] = $topic;
  115. }
  116. $cmd = 0x80;
  117. //$qos
  118. $cmd += ($qos << 1);
  119. $head = chr($cmd);
  120. $head .= chr($i);
  121. fwrite($this->socket, $head, 2);
  122. fwrite($this->socket, $buffer, $i);
  123. $string = $this->read(2);
  124. $bytes = ord(substr($string,1,1));
  125. $string = $this->read($bytes);
  126. }
  127. /* ping: sends a keep alive ping */
  128. function ping(){
  129. $head = " ";
  130. $head = chr(0xc0);
  131. $head .= chr(0x00);
  132. fwrite($this->socket, $head, 2);
  133. if($this->debug) echo "ping sent\n";
  134. }
  135. /* disconnect: sends a proper disconect cmd */
  136. function disconnect(){
  137. $head = " ";
  138. $head{0} = chr(0xe0);
  139. $head{1} = chr(0x00);
  140. fwrite($this->socket, $head, 2);
  141. }
  142. /* close: sends a proper disconect, then closes the socket */
  143. function close(){
  144. $this->disconnect();
  145. fclose($this->socket);
  146. }
  147. /* publish: publishes $content on a $topic */
  148. function publish($topic, $content, $qos = 0, $retain = 0){
  149. $i = 0;
  150. $buffer = "";
  151. $buffer .= $this->strwritestring($topic,$i);
  152. //$buffer .= $this->strwritestring($content,$i);
  153. if($qos){
  154. $id = $this->msgid++;
  155. $buffer .= chr($id >> 8); $i++;
  156. $buffer .= chr($id % 256); $i++;
  157. }
  158. $buffer .= $content;
  159. $i+=strlen($content);
  160. $head = " ";
  161. $cmd = 0x30;
  162. if($qos) $cmd += $qos << 1;
  163. if($retain) $cmd += 1;
  164. $head{0} = chr($cmd);
  165. $head .= $this->setmsglength($i);
  166. fwrite($this->socket, $head, strlen($head));
  167. fwrite($this->socket, $buffer, $i);
  168. }
  169. /* message: processes a recieved topic */
  170. function message($msg){
  171. $tlen = (ord($msg{0})<<8) + ord($msg{1});
  172. $topic = substr($msg,2,$tlen);
  173. $msg = substr($msg,($tlen+2));
  174. $found = 0;
  175. foreach($this->topics as $key=>$top){
  176. if( preg_match("/^".str_replace("#",".*",
  177. str_replace("+","[^\/]*",
  178. str_replace("/","\/",
  179. str_replace("$",'\$',
  180. $key))))."$/",$topic) ){
  181. if(function_exists($top['function'])){
  182. call_user_func($top['function'],$topic,$msg);
  183. $found = 1;
  184. }
  185. }
  186. }
  187. if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";
  188. }
  189. /* proc: the processing loop for an "allways on" client
  190. set true when you are doing other stuff in the loop good for watching something else at the same time */
  191. function proc( $loop = true){
  192. if(1){
  193. $sockets = array($this->socket);
  194. $w = $e = NULL;
  195. $cmd = 0;
  196. //$byte = fgetc($this->socket);
  197. if(feof($this->socket)){
  198. if($this->debug) echo "eof receive going to reconnect for good measure\n";
  199. fclose($this->socket);
  200. $this->connect(false);
  201. if(count($this->topics))
  202. $this->subscribe($this->topics);
  203. }
  204. $byte = $this->read(1, true);
  205. if(!strlen($byte)){
  206. if($loop){
  207. usleep(100000);
  208. }
  209. }else{
  210. $cmd = (int)(ord($byte)/16);
  211. if($this->debug) echo "Recevid: $cmd\n";
  212. $multiplier = 1;
  213. $value = 0;
  214. do{
  215. $digit = ord($this->read(1));
  216. $value += ($digit & 127) * $multiplier;
  217. $multiplier *= 128;
  218. }while (($digit & 128) != 0);
  219. if($this->debug) echo "Fetching: $value\n";
  220. if($value)
  221. $string = $this->read($value,"fetch");
  222. if($cmd){
  223. switch($cmd){
  224. case 3:
  225. $this->message($string);
  226. break;
  227. }
  228. $this->timesinceping = time();
  229. }
  230. }
  231. if($this->timesinceping < (time() - $this->keepalive )){
  232. if($this->debug) echo "not found something so ping\n";
  233. $this->ping();
  234. }
  235. if($this->timesinceping<(time()-($this->keepalive*2))){
  236. if($this->debug) echo "not seen a package in a while, disconnecting\n";
  237. fclose($this->socket);
  238. $this->connect(false);
  239. if(count($this->topics))
  240. $this->subscribe($this->topics);
  241. }
  242. }
  243. return 1;
  244. }
  245. /* getmsglength: */
  246. function getmsglength(&$msg, &$i){
  247. $multiplier = 1;
  248. $value = 0 ;
  249. do{
  250. $digit = ord($msg{$i});
  251. $value += ($digit & 127) * $multiplier;
  252. $multiplier *= 128;
  253. $i++;
  254. }while (($digit & 128) != 0);
  255. return $value;
  256. }
  257. /* setmsglength: */
  258. function setmsglength($len){
  259. $string = "";
  260. do{
  261. $digit = $len % 128;
  262. $len = $len >> 7;
  263. // if there are more digits to encode, set the top bit of this digit
  264. if ( $len > 0 )
  265. $digit = ($digit | 0x80);
  266. $string .= chr($digit);
  267. }while ( $len > 0 );
  268. return $string;
  269. }
  270. /* strwritestring: writes a string to a buffer */
  271. function strwritestring($str, &$i){
  272. $ret = " ";
  273. $len = strlen($str);
  274. $msb = $len >> 8;
  275. $lsb = $len % 256;
  276. $ret = chr($msb);
  277. $ret .= chr($lsb);
  278. $ret .= $str;
  279. $i += ($len+2);
  280. return $ret;
  281. }
  282. function printstr($string){
  283. $strlen = strlen($string);
  284. for($j=0;$j<$strlen;$j++){
  285. $num = ord($string{$j});
  286. if($num > 31)
  287. $chr = $string{$j}; else $chr = " ";
  288. printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr);
  289. }
  290. }
  291. }
  292. ?>