import webapp2
import csv
import time
import httplib2
import StringIO
import os

from google.appengine.api import logservice
from google.appengine.api import files

from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials

from mapreduce import base_handler, mapreduce_pipeline
from mapreduce.lib import pipeline

import config

credentials = AppAssertionCredentials(
http = credentials.authorize(http=httplib2.Http())
service = build('bigquery','v2',http=http)

class Log2Bq(base_handler.PipelineBase):
  """A pipeline to ingest log as CSV in Google Big Query."""
  def run(self, start_time, end_time, version_ids):
    files = yield Log2Gs(start_time, end_time, version_ids)
    yield Gs2Bq(files)

class Log2Gs(base_handler.PipelineBase):
  """A pipeline to ingest log as CSV in Google Storage."""
  def run(self, start_time, end_time, version_ids):
    # Create a MapperPipeline w/ `LogInputReader`, `FileOutputWriter`
    yield mapreduce_pipeline.MapperPipeline(
            "input_reader" : {
                "start_time": start_time,
                "end_time": end_time,
                "version_ids": version_ids,
            "output_writer" : {
                "filesystem": "gs",
                "gs_bucket_name": config.gs_bucket_name,

# Create a mapper function that convert request logs object to CSV.
def log2csv(r):
  """Convert log API RequestLog object to csv."""
  s = StringIO.StringIO()
  w = csv.writer(s)
  w.writerow([r.start_time, r.method, r.resource,
              r.status, r.latency, r.response_size,
              r.user_agent if r.user_agent else "NULL"])
  line = s.getvalue()
  yield line

# Create a pipeline that takes gs:// files as argument and ingest them
# using a Big Query `load` job.
class Gs2Bq(base_handler.PipelineBase):
  """A pipeline to ingest log csv from Google Storage to Google BigQuery."""
  def run(self, files):
    jobs =
    gs_paths = [f.replace('/gs/', 'gs://') for f in files]
    result =,
                         body={'projectId': config.project_id,
                                   'sourceUris': gs_paths,
                                   'schema': {
                                     'fields': config.bigquery_schema,
                                   'destinationTable': {
                                     'projectId': config.project_id,
                                     'datasetId': config.bigquery_dataset_id,
                                     'tableId': config.bigquery_table_id
    yield BqCheck(result['jobReference']['jobId'])

# Create a pipeline that check for a Big Query job status
class BqCheck(base_handler.PipelineBase):
  """A pipeline to check for Big Query job status."""
  def run(self, job):
    jobs =
    status = jobs.get(projectId=config.project_id,
    job_state = status['status']['state']
    if job_state == 'PENDING' or job_state == 'RUNNING':
      delay = yield pipeline.common.Delay(seconds=1)
      with pipeline.After(delay):
        yield BqCheck(job)
      yield pipeline.common.Return(status)

# Create an handler that launch the pipeline and redirect to the
# pipeline UI.
class Level4Handler(webapp2.RequestHandler):
  def get(self):
    now = time.time()
    start_time = now - 60 * 5
    major, minor = os.environ["CURRENT_VERSION_ID"].split(".")
    p = Log2Bq(start_time, now, [major])
    self.redirect('/mapreduce/pipeline/status?root=%s' % p.root_pipeline_id)

app = webapp2.WSGIApplication([('/solution/level4', Level4Handler)], debug=True)