var Readable = require('stream').Readable,
    Writable = require('stream').Writable,
    Transform = require('stream').Transform,
    util = require('util');

// we can almost think of a READABLE stream as a 'lazy' loop that gets iterated by the caller
// each call to read is an explicit instruction to iterate the loop
var ReadStream = function() {
  Readable.call(this, {objectMode: true});
  this.currChar = 97; // 'a'
};
util.inherits(ReadStream, Readable);


ReadStream.prototype._read = function() {
  // stream end condition
  if (this.currChar > 'z'.charCodeAt(0)) {
    this.push(null);
  }
  else {
      // one iteration of the underlying data character
      this.push(String.fromCharCode(this.currChar));
      this.currChar++;
  }

};


/**
 * //////////////////////////////////////////
 * // same as process.stdout. writes objects
 * //////////////////////////////////////////
 */

var WriteStream = function() {
  Writable.call(this, {objectMode: true});
};
util.inherits(WriteStream, Writable);


WriteStream.prototype._write = function(chunk, encoding, callback) {
  console.log(chunk);
  callback();
};



var TransformStream_ChunkByN = function(n) {
  Transform.call(this, {objectMode: true});

  if(n === void 0 || n === null || n === 0) {
    throw new Error('invalid buffer size');
  }

  this.chunkSize = n;
  this.localBuffer = [];
};
util.inherits(TransformStream_ChunkByN, Transform);

TransformStream_ChunkByN.prototype.flushBuffer = function() {
    this.push(this.localBuffer);
    this.localBuffer = [];
};

TransformStream_ChunkByN.prototype._transform = function(chunk, encoding, callback) {

  this.localBuffer.push(chunk);

  if(this.localBuffer.length === this.chunkSize) {
    this.flushBuffer();
  }

  callback();
};

// Usage
var rs = new ReadStream();
var trans = new TransformStream_ChunkByN(3);
var ws = new WriteStream();

rs.pipe(trans).pipe(ws);