File Editor
Directories:
.. (Back)
Files:
ArrayUtil.php
CaseInsensitiveArray.php
Curl.php
Decoder.php
Encoder.php
MultiCurl.php
StringUtil.php
Url.php
Create New File
Create
Edit File: MultiCurl.php
<?php namespace Curl; include_once __DIR__ . '/ArrayUtil.php'; use Curl\ArrayUtil; class MultiCurl { public $baseUrl = null; public $multiCurl; private $curls = []; private $activeCurls = []; private $isStarted = false; private $concurrency = 25; private $nextCurlId = 0; private $rateLimit = null; private $rateLimitEnabled = false; private $rateLimitReached = false; private $maxRequests = null; private $interval = null; private $intervalSeconds = null; private $unit = null; private $currentStartTime = null; private $currentRequestCount = 0; private $beforeSendCallback = null; private $successCallback = null; private $errorCallback = null; private $completeCallback = null; private $retry = null; private $cookies = []; private $headers = []; private $options = []; private $proxies = null; private $jsonDecoder = null; private $xmlDecoder = null; /** * Construct * * @access public * @param $base_url */ public function __construct($base_url = null) { $this->multiCurl = curl_multi_init(); $this->headers = new CaseInsensitiveArray(); $this->setUrl($base_url); } /** * Add Delete * * @access public * @param $url * @param $query_parameters * @param $data * * @return object */ public function addDelete($url, $query_parameters = [], $data = []) { if (is_array($url)) { $data = $query_parameters; $query_parameters = $url; $url = $this->baseUrl; } $curl = new Curl(); $this->queueHandle($curl); $curl->setUrl($url, $query_parameters); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'DELETE'); $curl->setOpt(CURLOPT_POSTFIELDS, $curl->buildPostData($data)); return $curl; } /** * Add Download * * @access public * @param $url * @param $mixed_filename * * @return object */ public function addDownload($url, $mixed_filename) { $curl = new Curl(); $this->queueHandle($curl); $curl->setUrl($url); // Use tmpfile() or php://temp to avoid "Too many open files" error. if (is_callable($mixed_filename)) { $curl->downloadCompleteCallback = $mixed_filename; $curl->downloadFileName = null; $curl->fileHandle = tmpfile(); } else { $filename = $mixed_filename; // Use a temporary file when downloading. Not using a temporary file can cause an error when an existing // file has already fully completed downloading and a new download is started with the same destination save // path. The download request will include header "Range: bytes=$filesize-" which is syntactically valid, // but unsatisfiable. $download_filename = $filename . '.pccdownload'; $this->downloadFileName = $download_filename; // Attempt to resume download only when a temporary download file exists and is not empty. if (is_file($download_filename) && $filesize = filesize($download_filename)) { $first_byte_position = $filesize; $range = $first_byte_position . '-'; $curl->setRange($range); $curl->fileHandle = fopen($download_filename, 'ab'); // Move the downloaded temporary file to the destination save path. $curl->downloadCompleteCallback = function ($instance, $fh) use ($download_filename, $filename) { // Close the open file handle before renaming the file. if (is_resource($fh)) { fclose($fh); } rename($download_filename, $filename); }; } else { $curl->fileHandle = fopen('php://temp', 'wb'); $curl->downloadCompleteCallback = function ($instance, $fh) use ($filename) { file_put_contents($filename, stream_get_contents($fh)); }; } } $curl->setFile($curl->fileHandle); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'GET'); $curl->setOpt(CURLOPT_HTTPGET, true); return $curl; } /** * Add Get * * @access public * @param $url * @param $data * * @return object */ public function addGet($url, $data = []) { if (is_array($url)) { $data = $url; $url = $this->baseUrl; } $curl = new Curl(); $this->queueHandle($curl); $curl->setUrl($url, $data); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'GET'); $curl->setOpt(CURLOPT_HTTPGET, true); return $curl; } /** * Add Head * * @access public * @param $url * @param $data * * @return object */ public function addHead($url, $data = []) { if (is_array($url)) { $data = $url; $url = $this->baseUrl; } $curl = new Curl(); $this->queueHandle($curl); $curl->setUrl($url, $data); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'HEAD'); $curl->setOpt(CURLOPT_NOBODY, true); return $curl; } /** * Add Options * * @access public * @param $url * @param $data * * @return object */ public function addOptions($url, $data = []) { if (is_array($url)) { $data = $url; $url = $this->baseUrl; } $curl = new Curl(); $this->queueHandle($curl); $curl->setUrl($url, $data); $curl->removeHeader('Content-Length'); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'OPTIONS'); return $curl; } /** * Add Patch * * @access public * @param $url * @param $data * * @return object */ public function addPatch($url, $data = []) { if (is_array($url)) { $data = $url; $url = $this->baseUrl; } $curl = new Curl(); if (is_array($data) && empty($data)) { $curl->removeHeader('Content-Length'); } $this->queueHandle($curl); $curl->setUrl($url); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'PATCH'); $curl->setOpt(CURLOPT_POSTFIELDS, $curl->buildPostData($data)); return $curl; } /** * Add Post * * @access public * @param $url * @param $data * @param $follow_303_with_post * If true, will cause 303 redirections to be followed using GET requests (default: false). * Note: Redirections are only followed if the CURLOPT_FOLLOWLOCATION option is set to true. * * @return object */ public function addPost($url, $data = '', $follow_303_with_post = false) { if (is_array($url)) { $follow_303_with_post = (bool)$data; $data = $url; $url = $this->baseUrl; } $curl = new Curl(); $this->queueHandle($curl); if (is_array($data) && empty($data)) { $curl->removeHeader('Content-Length'); } $curl->setUrl($url); /* * For post-redirect-get requests, the CURLOPT_CUSTOMREQUEST option must not * be set, otherwise cURL will perform POST requests for redirections. */ if (!$follow_303_with_post) { $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'POST'); } $curl->setOpt(CURLOPT_POST, true); $curl->setOpt(CURLOPT_POSTFIELDS, $curl->buildPostData($data)); return $curl; } /** * Add Put * * @access public * @param $url * @param $data * * @return object */ public function addPut($url, $data = []) { if (is_array($url)) { $data = $url; $url = $this->baseUrl; } $curl = new Curl(); $this->queueHandle($curl); $curl->setUrl($url); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'PUT'); $put_data = $curl->buildPostData($data); if (is_string($put_data)) { $curl->setHeader('Content-Length', strlen($put_data)); } $curl->setOpt(CURLOPT_POSTFIELDS, $put_data); return $curl; } /** * Add Search * * @access public * @param $url * @param $data * * @return object */ public function addSearch($url, $data = []) { if (is_array($url)) { $data = $url; $url = $this->baseUrl; } $curl = new Curl(); $this->queueHandle($curl); $curl->setUrl($url); $curl->setOpt(CURLOPT_CUSTOMREQUEST, 'SEARCH'); $put_data = $curl->buildPostData($data); if (is_string($put_data)) { $curl->setHeader('Content-Length', strlen($put_data)); } $curl->setOpt(CURLOPT_POSTFIELDS, $put_data); return $curl; } /** * Add Curl * * Add a Curl instance to the handle queue. * * @access public * @param $curl * * @return object */ public function addCurl(Curl $curl) { $this->queueHandle($curl); return $curl; } /** * Before Send * * @access public * @param $callback */ public function beforeSend($callback) { $this->beforeSendCallback = $callback; } /** * Close * * @access public */ public function close() { foreach ($this->curls as $curl) { $curl->close(); } if (is_resource($this->multiCurl)) { curl_multi_close($this->multiCurl); } } /** * Complete * * @access public * @param $callback */ public function complete($callback) { $this->completeCallback = $callback; } /** * Error * * @access public * @param $callback */ public function error($callback) { $this->errorCallback = $callback; } /** * Get Opt * * @access public * @param $option * * @return mixed */ public function getOpt($option) { return isset($this->options[$option]) ? $this->options[$option] : null; } /** * Set Basic Authentication * * @access public * @param $username * @param $password */ public function setBasicAuthentication($username, $password = '') { $this->setOpt(CURLOPT_HTTPAUTH, CURLAUTH_BASIC); $this->setOpt(CURLOPT_USERPWD, $username . ':' . $password); } /** * Set Concurrency * * @access public * @param $concurrency */ public function setConcurrency($concurrency) { $this->concurrency = $concurrency; } /** * Set Digest Authentication * * @access public * @param $username * @param $password */ public function setDigestAuthentication($username, $password = '') { $this->setOpt(CURLOPT_HTTPAUTH, CURLAUTH_DIGEST); $this->setOpt(CURLOPT_USERPWD, $username . ':' . $password); } /** * Set Cookie * * @access public * @param $key * @param $value */ public function setCookie($key, $value) { $this->cookies[$key] = $value; } /** * Set Cookies * * @access public * @param $cookies */ public function setCookies($cookies) { foreach ($cookies as $key => $value) { $this->cookies[$key] = $value; } } /** * Set Port * * @access public * @param $port */ public function setPort($port) { $this->setOpt(CURLOPT_PORT, intval($port)); } /** * Set Connect Timeout * * @access public * @param $seconds */ public function setConnectTimeout($seconds) { $this->setOpt(CURLOPT_CONNECTTIMEOUT, $seconds); } /** * Set Cookie String * * @access public * @param $string */ public function setCookieString($string) { $this->setOpt(CURLOPT_COOKIE, $string); } /** * Set Cookie File * * @access public * @param $cookie_file */ public function setCookieFile($cookie_file) { $this->setOpt(CURLOPT_COOKIEFILE, $cookie_file); } /** * Set Cookie Jar * * @access public * @param $cookie_jar */ public function setCookieJar($cookie_jar) { $this->setOpt(CURLOPT_COOKIEJAR, $cookie_jar); } /** * Set File * * @access public * @param $file */ public function setFile($file) { $this->setOpt(CURLOPT_FILE, $file); } /** * Set Header * * Add extra header to include in the request. * * @access public * @param $key * @param $value */ public function setHeader($key, $value) { $this->headers[$key] = $value; $this->updateHeaders(); } /** * Set Headers * * Add extra headers to include in the request. * * @access public * @param $headers */ public function setHeaders($headers) { if (ArrayUtil::isArrayAssoc($headers)) { foreach ($headers as $key => $value) { $key = trim($key); $value = trim($value); $this->headers[$key] = $value; } } else { foreach ($headers as $header) { list($key, $value) = explode(':', $header, 2); $key = trim($key); $value = trim($value); $this->headers[$key] = $value; } } $this->updateHeaders(); } /** * Set JSON Decoder * * @access public * @param $mixed boolean|callable */ public function setJsonDecoder($mixed) { if ($mixed === false) { $this->jsonDecoder = false; } elseif (is_callable($mixed)) { $this->jsonDecoder = $mixed; } } /** * Set XML Decoder * * @access public * @param $mixed boolean|callable */ public function setXmlDecoder($mixed) { if ($mixed === false) { $this->xmlDecoder = false; } elseif (is_callable($mixed)) { $this->xmlDecoder = $mixed; } } /** * Set Proxy * * Set an HTTP proxy to tunnel requests through. * * @access public * @param $proxy - The HTTP proxy to tunnel requests through. May include port number. * @param $port - The port number of the proxy to connect to. This port number can also be set in $proxy. * @param $username - The username to use for the connection to the proxy. * @param $password - The password to use for the connection to the proxy. */ public function setProxy($proxy, $port = null, $username = null, $password = null) { $this->setOpt(CURLOPT_PROXY, $proxy); if ($port !== null) { $this->setOpt(CURLOPT_PROXYPORT, $port); } if ($username !== null && $password !== null) { $this->setOpt(CURLOPT_PROXYUSERPWD, $username . ':' . $password); } } /** * Set Proxies * * Set proxies to tunnel requests through. When set, a random proxy will be * used for the request. * * @access public * @param $proxies array - A list of HTTP proxies to tunnel requests * through. May include port number. */ public function setProxies($proxies) { $this->proxies = $proxies; } /** * Set Proxy Auth * * Set the HTTP authentication method(s) to use for the proxy connection. * * @access public * @param $auth */ public function setProxyAuth($auth) { $this->setOpt(CURLOPT_PROXYAUTH, $auth); } /** * Set Proxy Type * * Set the proxy protocol type. * * @access public * @param $type */ public function setProxyType($type) { $this->setOpt(CURLOPT_PROXYTYPE, $type); } /** * Set Proxy Tunnel * * Set the proxy to tunnel through HTTP proxy. * * @access public * @param $tunnel boolean */ public function setProxyTunnel($tunnel = true) { $this->setOpt(CURLOPT_HTTPPROXYTUNNEL, $tunnel); } /** * Unset Proxy * * Disable use of the proxy. * * @access public */ public function unsetProxy() { $this->setOpt(CURLOPT_PROXY, null); } /** * Set Opt * * @access public * @param $option * @param $value */ public function setOpt($option, $value) { $this->options[$option] = $value; } /** * Set Opts * * @access public * @param $options */ public function setOpts($options) { foreach ($options as $option => $value) { $this->setOpt($option, $value); } } /** * Set Range * * @access public * @param $range */ public function setRange($range) { $this->setOpt(CURLOPT_RANGE, $range); } /** * Set Rate Limit * * @access public * @param $rate_limit string (e.g. "60/1m"). * @throws \UnexpectedValueException */ public function setRateLimit($rate_limit) { $rate_limit_pattern = '/' . // delimiter '^' . // assert start '(\d+)' . // digit(s) '\/' . // slash '(\d+)?' . // digit(s), optional '(s|m|h)' . // unit, s for seconds, m for minutes, h for hours '$' . // assert end '/' . // delimiter 'i' . // case-insensitive matches ''; if (!preg_match($rate_limit_pattern, $rate_limit, $matches)) { throw new \UnexpectedValueException( 'rate limit must be formatted as $max_requests/$interval(s|m|h) ' . '(e.g. "60/1m" for a maximum of 60 requests per 1 minute)' ); } $max_requests = (int)$matches['1']; if ($matches['2'] === '') { $interval = 1; } else { $interval = (int)$matches['2']; } $unit = strtolower($matches['3']); // Convert interval to seconds based on unit. if ($unit === 's') { $interval_seconds = $interval * 1; } elseif ($unit === 'm') { $interval_seconds = $interval * 60; } elseif ($unit === 'h') { $interval_seconds = $interval * 3600; } $this->rateLimit = $max_requests . '/' . $interval . $unit; $this->rateLimitEnabled = true; $this->maxRequests = $max_requests; $this->interval = $interval; $this->intervalSeconds = $interval_seconds; $this->unit = $unit; } /** * Set Referer * * @access public * @param $referer */ public function setReferer($referer) { $this->setReferrer($referer); } /** * Set Referrer * * @access public * @param $referrer */ public function setReferrer($referrer) { $this->setOpt(CURLOPT_REFERER, $referrer); } /** * Set Retry * * Number of retries to attempt or decider callable. * * When using a number of retries to attempt, the maximum number of attempts * for the request is $maximum_number_of_retries + 1. * * When using a callable decider, the request will be retried until the * function returns a value which evaluates to false. * * @access public * @param $mixed */ public function setRetry($mixed) { $this->retry = $mixed; } /** * Set Timeout * * @access public * @param $seconds */ public function setTimeout($seconds) { $this->setOpt(CURLOPT_TIMEOUT, $seconds); } /** * Disable Timeout * * @access public */ public function disableTimeout() { $this->setTimeout(null); } /** * Set Url * * @access public * @param $url */ public function setUrl($url) { $this->baseUrl = $url; } /** * Set User Agent * * @access public * @param $user_agent */ public function setUserAgent($user_agent) { $this->setOpt(CURLOPT_USERAGENT, $user_agent); } /** * Set Interface * * The name of the outgoing network interface to use. * This can be an interface name, an IP address or a host name. * * @access public * @param $interface */ public function setInterface($interface) { $this->setOpt(CURLOPT_INTERFACE, $interface); } /** * Start * * @access public * @throws \ErrorException */ public function start() { if ($this->isStarted) { return; } $this->isStarted = true; $this->currentStartTime = microtime(true); $this->currentRequestCount = 0; do { while (count($this->curls) && count($this->activeCurls) < $this->concurrency && (!$this->rateLimitEnabled || $this->hasRequestQuota()) ) { $this->initHandle(); } if ($this->rateLimitEnabled && !count($this->activeCurls) && !$this->hasRequestQuota()) { $this->waitUntilRequestQuotaAvailable(); } // Wait for activity on any curl_multi connection when curl_multi_select (libcurl) fails to correctly block. // https://bugs.php.net/bug.php?id=63411 if (curl_multi_select($this->multiCurl) === -1) { usleep(100000); } curl_multi_exec($this->multiCurl, $active); while (($info_array = curl_multi_info_read($this->multiCurl)) !== false) { if ($info_array['msg'] === CURLMSG_DONE) { foreach ($this->activeCurls as $key => $curl) { if ($curl->curl === $info_array['handle']) { // Set the error code for multi handles using the "result" key in the array returned by // curl_multi_info_read(). Using curl_errno() on a multi handle will incorrectly return 0 // for errors. $curl->curlErrorCode = $info_array['result']; $curl->exec($curl->curl); if ($curl->attemptRetry()) { // Remove completed handle before adding again in order to retry request. curl_multi_remove_handle($this->multiCurl, $curl->curl); $curlm_error_code = curl_multi_add_handle($this->multiCurl, $curl->curl); if ($curlm_error_code !== CURLM_OK) { throw new \ErrorException( 'cURL multi add handle error: ' . curl_multi_strerror($curlm_error_code) ); } } else { $curl->execDone(); // Remove completed instance from active curls. unset($this->activeCurls[$key]); // Remove handle of the completed instance. curl_multi_remove_handle($this->multiCurl, $curl->curl); // Clean up completed instance. $curl->close(); } break; } } } } } while ($active || count($this->activeCurls) || count($this->curls)); $this->isStarted = false; } /** * Success * * @access public * @param $callback */ public function success($callback) { $this->successCallback = $callback; } /** * Unset Header * * Remove extra header previously set using Curl::setHeader(). * * @access public * @param $key */ public function unsetHeader($key) { unset($this->headers[$key]); } /** * Remove Header * * Remove an internal header from the request. * Using `curl -H "Host:" ...' is equivalent to $curl->removeHeader('Host');. * * @access public * @param $key */ public function removeHeader($key) { $this->setHeader($key, ''); } /** * Verbose * * @access public * @param bool $on * @param resource $output */ public function verbose($on = true, $output = STDERR) { // Turn off CURLINFO_HEADER_OUT for verbose to work. This has the side // effect of causing Curl::requestHeaders to be empty. if ($on) { $this->setOpt(CURLINFO_HEADER_OUT, false); } $this->setOpt(CURLOPT_VERBOSE, $on); $this->setOpt(CURLOPT_STDERR, $output); } /** * Destruct * * @access public */ public function __destruct() { $this->close(); } /** * Update Headers * * @access private */ private function updateHeaders() { foreach ($this->curls as $curl) { $curl->setHeaders($this->headers); } } /** * Queue Handle * * @access private * @param $curl */ private function queueHandle($curl) { // Use sequential ids to allow for ordered post processing. $curl->id = $this->nextCurlId++; $curl->childOfMultiCurl = true; $this->curls[$curl->id] = $curl; $curl->setHeaders($this->headers); } /** * Init Handle * * @access private * @param $curl * @throws \ErrorException */ private function initHandle() { $curl = array_shift($this->curls); if ($curl === null) { return; } // Add instance to list of active curls. $this->currentRequestCount += 1; $this->activeCurls[$curl->id] = $curl; // Set callbacks if not already individually set. if ($curl->beforeSendCallback === null) { $curl->beforeSend($this->beforeSendCallback); } if ($curl->successCallback === null) { $curl->success($this->successCallback); } if ($curl->errorCallback === null) { $curl->error($this->errorCallback); } if ($curl->completeCallback === null) { $curl->complete($this->completeCallback); } // Set decoders if not already individually set. if ($curl->jsonDecoder === null) { $curl->setJsonDecoder($this->jsonDecoder); } if ($curl->xmlDecoder === null) { $curl->setXmlDecoder($this->xmlDecoder); } $curl->setOpts($this->options); $curl->setRetry($this->retry); $curl->setCookies($this->cookies); // Use a random proxy for the curl instance when proxies have been set // and the curl instance doesn't already have a proxy set. if (is_array($this->proxies) && $curl->getOpt(CURLOPT_PROXY) === null) { $random_proxy = ArrayUtil::arrayRandom($this->proxies); $curl->setProxy($random_proxy); } $curlm_error_code = curl_multi_add_handle($this->multiCurl, $curl->curl); if ($curlm_error_code !== CURLM_OK) { throw new \ErrorException('cURL multi add handle error: ' . curl_multi_strerror($curlm_error_code)); } $curl->call($curl->beforeSendCallback); } /** * Has Request Quota * * Checks if there is any available quota to make additional requests while * rate limiting is enabled. * * @access private */ private function hasRequestQuota() { // Calculate if there's request quota since ratelimiting is enabled. if ($this->rateLimitEnabled) { // Determine if the limit of requests per interval has been reached. if ($this->currentRequestCount >= $this->maxRequests) { $elapsed_seconds = microtime(true) - $this->currentStartTime; if ($elapsed_seconds <= $this->intervalSeconds) { $this->rateLimitReached = true; return false; } elseif ($this->rateLimitReached) { $this->rateLimitReached = false; $this->currentStartTime = microtime(true); $this->currentRequestCount = 0; } } return true; } else { return true; } } /** * Wait Until Request Quota Available * * Waits until there is available request quota available based on the rate limit. * * @access private */ private function waitUntilRequestQuotaAvailable() { $sleep_until = $this->currentStartTime + $this->intervalSeconds; $sleep_seconds = $sleep_until - microtime(true); // Avoid using time_sleep_until() as it appears to be less precise and not sleep long enough. usleep($sleep_seconds * 1000000); $this->currentStartTime = microtime(true); $this->currentRequestCount = 0; } }
Save Changes
Rename File
Rename