import webapp2
import csv
import time
import httplib2
import StringIO
import os

from google.appengine.api import logservice
from google.appengine.api import files

from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials

from mapreduce import base_handler, mapreduce_pipeline
from mapreduce.lib import pipeline

import config

credentials = AppAssertionCredentials(
http = credentials.authorize(http=httplib2.Http())
service = build('bigquery','v2',http=http)

class Log2Bq(base_handler.PipelineBase):
  """A pipeline to ingest log as CSV in Google Big Query."""
  def run(self, start_time, end_time, version_ids):
    # TODO: missing call: pipeline to copy logs to Google Storage
    files = yield TODO(start_time, end_time, version_ids)
    # TODO: missing call: pipeline to ingest gs:// files to BigQuery
    yield TODO(files)

class Log2Gs(base_handler.PipelineBase):
  """A pipeline to ingest log as CSV in Google Storage."""
  def run(self, start_time, end_time, version_ids):
    # Create a MapperPipeline w/ `LogInputReader`, `FileOutputWriter`
    # TODO1: missing arg: your mapper function name
    # TODO2: missing arg: the input reader
    # TODO3: missing arg: the output writer
    # TODO4: missing arg: the google storage bucket name
    yield mapreduce_pipeline.MapperPipeline(
            "input_reader" : {
                "start_time": start_time,
                "end_time": end_time,
                "version_ids": version_ids,
            "output_writer" : {
                "filesystem": "gs",
                "gs_bucket_name": TODO4,

# Create a mapper function that convert request logs object to CSV.
def log2csv(r):
  """Convert log API RequestLog object to csv."""
  s = StringIO.StringIO()
  w = csv.writer(s)
  # TODO: missing args: `r` RequestLog attributes
  w.writerow([r.TODO_time, r.TODO, r.TODO,
              r.TODO, r.TODO, r.TODO,
              r.TODO if r.TODO else "NULL"])
  line = s.getvalue()
  yield line

# Create a pipeline that takes gs:// files as argument and ingest them
# using a Big Query `load` job.
class Gs2Bq(base_handler.PipelineBase):
  """A pipeline to ingest log csv from Google Storage to Google BigQuery."""
  def run(self, files):
    jobs =
    # TODO: replace /gs/ with gs:// for all files
    gs_paths = [f.TODO() for f in files]
    result =,
                         body={'projectId': config.project_id,
                                   'sourceUris': gs_paths,
                                   'schema': {
                                     'fields': config.bigquery_schema,
                                   'destinationTable': {
                                     'projectId': config.project_id,
                                     'datasetId': config.bigquery_dataset_id,
                                     'tableId': config.bigquery_table_id
    # TODO: missing arg: the job id  
    yield BqCheck(TODO)

# Create a pipeline that check for a Big Query job status
class BqCheck(base_handler.PipelineBase):
  """A pipeline to check for Big Query job status."""
  def run(self, job):
    jobs =
    status = jobs.get(projectId=config.project_id,
    # TODO: check job status
    job_state = status['TODO']['TODO']
    if job_state == 'PENDING' or job_state == 'RUNNING':
      # TODO: missing arg: retry delay in seconds
      delay = yield pipeline.common.Delay(seconds=TODO)
      with pipeline.After(delay):
        # TODO: missing call: pipeline that check again for the job status
        yield TODO(job)
      # TODO: missing arg: the job status
      yield pipeline.common.Return(TODO)

# Create an handler that launch the pipeline and redirect to the
# pipeline UI.
class Level4Handler(webapp2.RequestHandler):
  def get(self):
    now = time.time()
    # TODO1: missing arg: start_time
    # TODO2: missing arg: end_time
    # TODO3: missing arg: version
    p = Log2Bq(TODO1, TODO2, [TODO3])
    self.redirect('/mapreduce/pipeline/status?root=%s' % p.root_pipeline_id)

app = webapp2.WSGIApplication([('/level4', Level4Handler)], debug=True)