1: <?php
2:
3: namespace Budabot\Core;
4:
5: use stdClass;
6:
7: 8: 9: 10: 11: 12: 13:
14: class AsyncHttp {
15:
16:
17: public $setting;
18:
19:
20: public $socketManager;
21:
22:
23: public $timer;
24:
25:
26: public $logger;
27:
28:
29: private $uri;
30: private $callback;
31: private $data;
32: private $headers = array();
33: private $timeout = null;
34: private $queryParams = array();
35:
36:
37: private $stream;
38: private $notifier;
39: private $requestData = '';
40: private $responseData = '';
41: private $headersEndPos = false;
42: private $responseHeaders = array();
43:
44: private $request;
45: private $errorString = false;
46: private $timeoutEvent = null;
47: private $finished;
48: private $loop;
49:
50:
51:
52: static public $overrideAddress = null;
53:
54: static public $overridePort = null;
55:
56: 57: 58: 59: 60:
61: public function __construct($method, $uri) {
62: $this->method = $method;
63: $this->uri = $uri;
64: $this->finished = false;
65: }
66:
67: 68: 69: 70:
71: public function execute() {
72: if (!$this->buildRequest()) {
73: return;
74: }
75:
76: $this->initTimeout();
77:
78: if (!$this->createStream()) {
79: return;
80: }
81: $this->setupStreamNotify();
82:
83: $this->logger->log('DEBUG', "Sending request: {$this->request->getData()}");
84: }
85:
86: private function buildRequest() {
87: try {
88: $this->request = new HttpRequest($this->method, $this->uri, $this->queryParams, $this->headers);
89: $this->requestData = $this->request->getData();
90: } catch (InvalidHttpRequest $e) {
91: $this->abortWithMessage($e->getMessage());
92: return false;
93: }
94: return true;
95: }
96:
97: 98: 99:
100: public function abortWithMessage($errorString) {
101: $this->setError($errorString . " for uri: '" . $this->uri . "' with params: '" . http_build_query($this->queryParams) . "'");
102: $this->finish();
103: }
104:
105: 106: 107: 108: 109:
110: private function setError($errorString) {
111: $this->errorString = $errorString;
112: $this->logger->log('ERROR', $errorString);
113: }
114:
115: private function finish() {
116: $this->finished = true;
117: if ($this->timeoutEvent) {
118: $this->timer->abortEvent($this->timeoutEvent);
119: $this->timeoutEvent = null;
120: }
121: $this->close();
122: $this->callCallback();
123: }
124:
125: 126: 127:
128: private function close() {
129: $this->socketManager->removeSocketNotifier($this->notifier);
130: $this->notifier = null;
131: fclose($this->stream);
132: }
133:
134: 135: 136:
137: private function callCallback() {
138: if ($this->callback !== null) {
139: $response = $this->buildResponse();
140: call_user_func($this->callback, $response, $this->data);
141: }
142: }
143:
144: private function buildResponse() {
145: $response = new StdClass();
146: if (empty($this->errorString)) {
147: $response->headers = $this->responseHeaders;
148: $response->body = $this->getResponseBody();
149: } else {
150: $response->error = $this->errorString;
151: }
152:
153: return $response;
154: }
155:
156: private function initTimeout() {
157: if ($this->timeout === null) {
158: $this->timeout = $this->setting->http_timeout;
159: }
160:
161: $this->timeoutEvent = $this->timer->callLater($this->timeout, array($this, 'abortWithMessage'),
162: "Timeout error after waiting {$this->timeout} seconds");
163: }
164:
165: private function createStream() {
166: $this->stream = stream_socket_client($this->getStreamUri(), $errno, $errstr, 10, $this->getStreamFlags());
167: if ($this->stream === false) {
168: $this->abortWithMessage("Failed to create socket stream, reason: $errstr ($errno)");
169: return false;
170: }
171: stream_set_blocking($this->stream, 0);
172: return true;
173: }
174:
175: private function getStreamUri() {
176: $scheme = $this->request->getScheme();
177: $host = self::$overrideAddress ? self::$overrideAddress : $this->request->getHost();
178: $port = self::$overridePort ? self::$overridePort : $this->request->getPort();
179: return "$scheme://$host:$port";
180: }
181:
182: private function getStreamFlags() {
183: $flags = STREAM_CLIENT_ASYNC_CONNECT | STREAM_CLIENT_CONNECT;
184:
185:
186: if ($this->request->getScheme() == 'ssl' && strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
187: $flags = STREAM_CLIENT_CONNECT;
188: }
189: return $flags;
190: }
191:
192: private function setupStreamNotify() {
193:
194: $this->notifier = new SocketNotifier(
195: $this->stream,
196: SocketNotifier::ACTIVITY_READ | SocketNotifier::ACTIVITY_WRITE | SocketNotifier::ACTIVITY_ERROR,
197: array($this, 'onStreamActivity')
198: );
199: $this->socketManager->addSocketNotifier($this->notifier);
200: }
201:
202: 203: 204: 205: 206: 207:
208: public function onStreamActivity($type) {
209: if ($this->finished) {
210: return;
211: }
212:
213: switch ($type) {
214: case SocketNotifier::ACTIVITY_READ:
215: $this->processResponse();
216: break;
217:
218: case SocketNotifier::ACTIVITY_WRITE:
219: $this->processRequest();
220: break;
221:
222: case SocketNotifier::ACTIVITY_ERROR:
223: $this->abortWithMessage('Socket error occurred');
224: break;
225: }
226: }
227:
228: private function processResponse() {
229: $this->responseData .= $this->readAllFromSocket();
230:
231: if (!$this->areHeadersReceived()) {
232: $this->processHeaders();
233: }
234:
235: if ($this->isBodyLengthKnown()) {
236: if ($this->isBodyFullyReceived()) {
237: $this->finish();
238: } else if ($this->isStreamClosed()) {
239: $this->abortWithMessage("Stream closed before receiving all data");
240: }
241: } else if ($this->isStreamClosed()) {
242: $this->finish();
243: }
244: }
245:
246: private function processHeaders() {
247: $this->headersEndPos = strpos($this->responseData, "\r\n\r\n");
248: if ($this->headersEndPos !== false) {
249: $headerData = substr($this->responseData, 0, $this->headersEndPos);
250: $this->responseHeaders = $this->extractHeadersFromHeaderData($headerData);
251: }
252: }
253:
254: private function getResponseBody() {
255: return substr($this->responseData, $this->headersEndPos + 4);
256: }
257:
258: private function areHeadersReceived() {
259: return $this->headersEndPos !== false;
260: }
261:
262: private function isStreamClosed() {
263: return feof($this->stream);
264: }
265:
266: private function isBodyFullyReceived() {
267: return $this->getBodyLength() <= strlen($this->getResponseBody());
268: }
269:
270: private function isBodyLengthKnown() {
271: return $this->getBodyLength() !== null;
272: }
273:
274: private function readAllFromSocket() {
275: $data = '';
276: while (true) {
277: $chunk = fread($this->stream, 8192);
278: if ($chunk === false) {
279: $this->abortWithMessage("Failed to read from the stream for uri '{$this->uri}'");
280: break;
281: }
282: if (strlen($chunk) == 0) {
283: break;
284: }
285: $data .= $chunk;
286: }
287:
288: if (!empty($data)) {
289:
290: $this->timer->restartEvent($this->timeoutEvent);
291: }
292:
293: return $data;
294: }
295:
296: private function getBodyLength() {
297: return isset($this->responseHeaders['content-length']) ? intval($this->responseHeaders['content-length']) : null;
298: }
299:
300: private function extractHeadersFromHeaderData($data) {
301: $headers = array();
302: $lines = explode("\r\n", $data);
303: list($version, $status, $statusMessage) = explode(" ", array_shift($lines), 3);
304: $headers['http-version'] = $version;
305: $headers['status-code'] = $status;
306: $headers['status-message'] = $statusMessage;
307: forEach ($lines as $line) {
308: if (preg_match('/([^:]+):(.+)/', $line, $matches)) {
309: $headers[strtolower(trim($matches[1]))] = trim($matches[2]);
310: }
311: }
312: return $headers;
313: }
314:
315: private function processRequest() {
316: if ($this->requestData) {
317: $written = fwrite($this->stream, $this->requestData);
318: if ($written === false) {
319: $this->abortWithMessage("Cannot write request headers for uri '{$this->uri}' to stream");
320: } else if ($written > 0) {
321: $this->requestData = substr($this->requestData, $written);
322:
323:
324: $this->timer->restartEvent($this->timeoutEvent);
325: }
326: }
327: }
328:
329: 330: 331: 332: 333:
334: public function withHeader($header, $value) {
335: $this->headers[$header] = $value;
336: return $this;
337: }
338:
339: 340: 341: 342:
343: public function withTimeout($timeout) {
344: $this->timeout = $timeout;
345: return $this;
346: }
347:
348: 349: 350: 351: 352: 353: 354: 355: 356: 357: 358: 359: 360: 361: 362: 363: 364:
365: public function withCallback($callback, $data = null) {
366: $this->callback = $callback;
367: $this->data = $data;
368: return $this;
369: }
370:
371: 372: 373: 374:
375: public function withQueryParams($params) {
376: $this->queryParams = $params;
377: return $this;
378: }
379:
380: 381: 382: 383: 384: 385: 386:
387: public function waitAndReturnResponse() {
388:
389: $this->loop = new EventLoop();
390: Registry::injectDependencies($this->loop);
391: while (!$this->finished) {
392: $this->loop->execSingleLoop();
393: }
394:
395: return $this->buildResponse();
396: }
397: }
398: