目录

短小且优雅的Promise并发控制实现

前言

Promise是前端工程师写代码最常用的知识点,而手写代码实现Promise的并发控制算是其中出现频繁且稍微难度高一点的题目。

实现版本一

这里给出一个参考实现,出自这里,代码只有十几行,但实现的非常巧妙。

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = [];               //2
  const executing = [];         //3
  for (const item of array) {   //4
    const p = Promise.resolve().then(() => iteratorFn(item));  //5
    ret.push(p);                //6
    if (poolLimit <= array.length) { //7
      const e = p.then(() => executing.splice(executing.indexOf(e), 1)); //8
      executing.push(e);        //9
      if (executing.length >= poolLimit) {  //10
        await Promise.race(executing);      //11
      }
    }
  }
  return Promise.all(ret);     //15
}

代码虽然不多,但需要对Promise非常熟悉才能理解,下面就模拟代码执行跑一遍执行过程。

假设 poolLimit = 3, array是一个长度为10的url列表, iteratorFn是一个返回Promose对象的函数用于发送请求,模拟一下执行过程:

  1. line2:创建数组ret,用于存放全部的Promise对象
  2. line3:创建数组execting,用于存放并发限制的处于Pending状态的Promise对象
  3. line4:item是array的第一项
  4. line5: iteratorFn(item) 得到一个pending状态的Promise对象 p。(这里之所以不直接 p = iteratorFn(item),是为了兼容iteratorFn是同步函数的场景,保证返回的p一定是Promose对象,见下方测试代码)
  5. line6:p放入ret
  6. line7:如果限制数量poolLimit 小于等于 数组的总长度再执行限制。当前poolLimit=3,arr.length=10,进入if逻辑
  7. line8:根据刚刚的p创建一个Promise对象e,等p resolve的时候才执行then里的回调,把e从executing数组移除(PS:目前e还没放入数组,在line9会放进去)
  8. line9:把e放入executing
  9. line10:目前executing长度小于poolLimit限制长度3,不进入if,回到line4执行下一次循环
  10. .... 循环执行到第3次时,到达line10,此时ret数组为[p1_pending, p2_pending, p3_pending],executing数组为[e1_pending, e2_pending, e3_pending],其中p1_pending 的resolve会触发e1的移出和resolve(第8行then里的箭头函数执行完e就resovle)
  11. line11:卡住,等待executing 数组里的[e1,e2,e3]看哪个最快resolve 。假设p2最先resolve,p2的resolve触发e2的resolve,当e2 resolve 之后,Promise.race(executing)得到结果, 此刻回到line4,for循环才进入下一轮,execting数组里为[e1_pending, e3_pending],ret数组为[p1pending, p2_fulfilled, p3_pending]。
  12. line4:... ,继续下一轮循环,execting数组始终保持最多不超过3个
  13. ...
  14. line15:当for循环结束之后,ret数组里包含全部arr封装的promise对象,返回Promise.all(ret),得到新的Promise对象(等ret所有的全resolve后该Promise对象才resolve,同时得到所有数据)

测试

以下是测试代码:

const curl = (i) => {
  console.log('开始' + i);
  return new Promise((resolve) => setTimeout(() => {
    resolve(i);
    console.log('结束' + i);
  }, 1000+Math.random()*1000));
};
 
/*
const curl = (i) => { 
  console.log('开始' + i);
  return i;
};
*/
let urls = Array(10).fill(0).map((v,i) => i);
(async () => {
    const res = await asyncPool(3, urls, curl);
    console.log(res);
 })();

实现版本二

这个版本的代码理解起来更简单,没有使用async、await。

function asyncPool(fn, arr, limit = 10) {
  let args = [...arr]   //不修改原参数数组
  let results = []      //存放最终结果
  let runningCount = 0  //正在运行的数量
  let resultIndex = 0   //结果的下标,用于控制结果的顺序
  let resultCount = 0   //结果的数量
 
  return new Promise((resolve) => {
    function run() {
      while(runningCount < limit && args.length > 0) {
        runningCount++
        ((i)=> {        //闭包用于保存结果下标,便于在resolve时把结果放到合适的位置
          let v = args.shift()
          console.log('正在运行' + runningCount)
          fn(v).then(val => {
            results[i] = val
          }, () => {
            throw new Error(`An error occurred: ${v}`)
          }).finally(() => {
            runningCount--
            resultCount++
            if(resultCount === arr.length) {  //这里之所以用resultCount做判断,而不用results的长度和args的长度,是因为这两个都不准确
              resolve(results)
            } else {
              run()
            }
          })          
        })(resultIndex++)
      }
    }
    run()
  })
}
 
 
//测试
function getWeather(city) {
  console.log(`开始获取${city}的天气`)
  return fetch(`https://api2.jirengu.com/getWeather.php?city=${city}`).then(res=> res.json())
}
 
let citys = ['北京', '上海', '杭州', '成都', '武汉', '天津', '深圳', '广州', '合肥', '郑州']
asyncPool(getWeather, citys, 2).then(results => console.log(results))

我是若愚老师,欢迎来饥人谷学习前端。

饥人谷一直致力于培养有灵魂的编程者,打造专业有爱的国内前端技术圈子。如造梦师一般帮助近千名不甘寂寞的追梦人把编程梦变为现实,他们以饥人谷为起点,足迹遍布包括facebook、阿里巴巴、百度、网易、京东、今日头条、大众美团、饿了么、ofo在内的国内外大小企业。 了解培训课程:加微信 xiedaimala03,官网:https://jirengu.com