All files chunkstream.js

90.11% Statements 82/91
81.4% Branches 35/43
100% Functions 10/10
90.11% Lines 82/91

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190    1x 1x   1x 529x   529x 529x   529x 529x   529x 529x   1x   1x 13318x           13318x   13318x     13318x                 1x 943x           943x 943x         943x 943x   943x     943x 346x     943x     1x 311x       311x     311x         311x 309x   2x 2x       1x   1x 311x 3x     311x     1x 327x       327x 327x 327x   327x     1x   741x     741x     741x 568x 568x   568x     173x 173x   173x       1x 12408x   12408x 12408x 12408x     12408x 12617x 12617x   12617x 12617x     12617x 11863x         12408x 532x     12408x   12408x     1x 14263x   14263x 15656x     15656x 741x 14915x     12408x       2507x       14263x 2x            
"use strict";
 
let util = require("util");
let Stream = require("stream");
 
let ChunkStream = (module.exports = function () {
  Stream.call(this);
 
  this._buffers = [];
  this._buffered = 0;
 
  this._reads = [];
  this._paused = false;
 
  this._encoding = "utf8";
  this.writable = true;
});
util.inherits(ChunkStream, Stream);
 
ChunkStream.prototype.read = function (length, callback) {
  this._reads.push({
    length: Math.abs(length), // if length < 0 then at most this length
    allowLess: length < 0,
    func: callback,
  });
 
  process.nextTick(
    function () {
      this._process();
 
      // its paused and there is not enought data then ask for more
      Iif (this._paused && this._reads.length > 0) {
        this._paused = false;
 
        this.emit("drain");
      }
    }.bind(this)
  );
};
 
ChunkStream.prototype.write = function (data, encoding) {
  Iif (!this.writable) {
    this.emit("error", new Error("Stream not writable"));
    return false;
  }
 
  let dataBuffer;
  Eif (Buffer.isBuffer(data)) {
    dataBuffer = data;
  } else {
    dataBuffer = Buffer.from(data, encoding || this._encoding);
  }
 
  this._buffers.push(dataBuffer);
  this._buffered += dataBuffer.length;
 
  this._process();
 
  // ok if there are no more read requests
  if (this._reads && this._reads.length === 0) {
    this._paused = true;
  }
 
  return this.writable && !this._paused;
};
 
ChunkStream.prototype.end = function (data, encoding) {
  Iif (data) {
    this.write(data, encoding);
  }
 
  this.writable = false;
 
  // already destroyed
  Iif (!this._buffers) {
    return;
  }
 
  // enqueue or handle end
  if (this._buffers.length === 0) {
    this._end();
  } else {
    this._buffers.push(null);
    this._process();
  }
};
 
ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
 
ChunkStream.prototype._end = function () {
  if (this._reads.length > 0) {
    this.emit("error", new Error("Unexpected end of input"));
  }
 
  this.destroy();
};
 
ChunkStream.prototype.destroy = function () {
  Iif (!this._buffers) {
    return;
  }
 
  this.writable = false;
  this._reads = null;
  this._buffers = null;
 
  this.emit("close");
};
 
ChunkStream.prototype._processReadAllowingLess = function (read) {
  // ok there is any data so that we can satisfy this request
  this._reads.shift(); // == read
 
  // first we need to peek into first buffer
  let smallerBuf = this._buffers[0];
 
  // ok there is more data than we need
  if (smallerBuf.length > read.length) {
    this._buffered -= read.length;
    this._buffers[0] = smallerBuf.slice(read.length);
 
    read.func.call(this, smallerBuf.slice(0, read.length));
  } else {
    // ok this is less than maximum length so use it all
    this._buffered -= smallerBuf.length;
    this._buffers.shift(); // == smallerBuf
 
    read.func.call(this, smallerBuf);
  }
};
 
ChunkStream.prototype._processRead = function (read) {
  this._reads.shift(); // == read
 
  let pos = 0;
  let count = 0;
  let data = Buffer.alloc(read.length);
 
  // create buffer for all data
  while (pos < read.length) {
    let buf = this._buffers[count++];
    let len = Math.min(buf.length, read.length - pos);
 
    buf.copy(data, pos, 0, len);
    pos += len;
 
    // last buffer wasn't used all so just slice it and leave
    if (len !== buf.length) {
      this._buffers[--count] = buf.slice(len);
    }
  }
 
  // remove all used buffers
  if (count > 0) {
    this._buffers.splice(0, count);
  }
 
  this._buffered -= read.length;
 
  read.func.call(this, data);
};
 
ChunkStream.prototype._process = function () {
  try {
    // as long as there is any data and read requests
    while (this._buffered > 0 && this._reads && this._reads.length > 0) {
      let read = this._reads[0];
 
      // read any data (but no more than length)
      if (read.allowLess) {
        this._processReadAllowingLess(read);
      } else if (this._buffered >= read.length) {
        // ok we can meet some expectations
 
        this._processRead(read);
      } else {
        // not enought data to satisfy first request in queue
        // so we need to wait for more
        break;
      }
    }
 
    if (this._buffers && !this.writable) {
      this._end();
    }
  } catch (ex) {
    this.emit("error", ex);
  }
};