-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathQueryAnalysis.py
More file actions
185 lines (154 loc) · 7.76 KB
/
QueryAnalysis.py
File metadata and controls
185 lines (154 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import argparse
import calendar
from datetime import datetime
import glob
import os
import shutil
import subprocess
import sys
import gzip
from .utility import utility
from . import config
os.nice(19)
months = {'january': [1, 31],
'february': [2, 28],
'march': [3, 31],
'april': [4, 30],
'may': [5, 31],
'june': [6, 30],
'july': [7, 31],
'august': [8, 31],
'september': [9, 30],
'october': [10, 31],
'november': [11, 30],
'december': [12, 31]}
parser = argparse.ArgumentParser("This script extracts the raw log data (if "
+ "it was not already done), processes them"
+ " using the java application and unifies "
+ "the query types.")
parser.add_argument("--ignoreLock", "-i", help="Ignore locked file and "
+ "execute anyways", action="store_true")
parser.add_argument("--threads", "-t", default=6, type=int, help="The number "
+ "of threads to run the java program with (default 7).")
parser.add_argument("--logging", "-l", help="Enables file logging.",
action="store_true")
parser.add_argument("--noBotMetrics", "-b", help="Disables metric calculation"
+ " for bot queries.", action="store_true")
parser.add_argument("--noDynamicQueryTypes", "-d", help="Disables dynamic "
+ "generation of query types.", action="store_true")
parser.add_argument("--noGzipOutput", "-g", help="Disables gzipping of the "
+ "output files.", action="store_true")
parser.add_argument("--noExampleQueriesOutput", "-e", help="Disables the "
+ "matching of example queries.", action="store_true")
parser.add_argument("--withUniqueQueryDetection", "-u", help="Enable unique query detection", action="store_true")
parser.add_argument("--dbLocation", "-p", type = str, default = config.dbLocation, help = "The path of the uniqueQueriesMapDb file.")
parser.add_argument("--queryTypeMapLocation", "-q", type = str, default = config.queryTypeMapDbLocation, help = "The path of the query type map db file. Default is in the working directory.")
parser.add_argument("--monthsFolder", "-m", default=config.monthsFolder,
type=str,
help="The folder in which the months directory are "
+ "residing.")
parser.add_argument("--year", "-y", default=datetime.now().year, type=int,
help="The year to be processed (default current year).")
parser.add_argument("months", type=str, help="The months to be processed")
# These are the field we extract from wmf.wdqs_extract that form the raw
# log data. They are not configurable via argument because the java program
# does not detect headers and thus depends on this specific order.
fields = ["uri_query", "uri_path", "user_agent", "ts", "agent_type",
"hour", "http_status"]
header = ""
for field in fields:
header += field + "\t"
header = header[:-1] + "\n"
if (len(sys.argv[1:]) == 0):
parser.print_help()
parser.exit()
args = parser.parse_args()
if calendar.isleap(args.year):
months['february'][1] = 29
for monthName in args.months.split(","):
if os.path.isfile(utility.addMissingSlash(args.monthsFolder)
+ utility.addMissingSlash(monthName) + "locked") \
and not args.ignoreLock:
print("ERROR: The month " + monthName + " is being edited at the " \
+ "moment. Use -i if you want to force the execution of this script.")
sys.exit()
month = utility.addMissingSlash(os.path.abspath(utility.addMissingSlash(args.monthsFolder)
+ utility.addMissingSlash(monthName)))
processedLogDataDirectory = month + "processedLogData/"
rawLogDataDirectory = month + "rawLogData/"
tempDirectory = rawLogDataDirectory + "temp/"
# If the month directory does not exist it is being created along with
# the directories for raw and processed log data.
if not os.path.exists(month):
print(("Starting data extraction from wmf.wdqs_extract for "
+ monthName + "."))
os.makedirs(month)
os.makedirs(processedLogDataDirectory)
os.makedirs(rawLogDataDirectory)
# For each day we send a command to hive that extracts all entries for
# this day (in the given month and year) and writes them to temporary
# files.
for day in range(1, months[monthName][1] + 1):
arguments = ['hive', '-e']
os.makedirs(tempDirectory)
hive_call = 'insert overwrite local directory \'' + tempDirectory \
+ '\' row format delimited fields terminated ' \
+ 'by \'\\t\' select '
# We add all the fields to the request
for field in fields:
hive_call += field + ", "
hive_call = hive_call[:-2] + " "
########## NEEDS UPDATE! ##############
# This hive call is obsolete and doesn't return any results.
# The wmf.wdqs_extract data store is no longer the right place to look.
# The query needs to be updated based on the current internal data schema.
#
# The only fields that are used in the anonymization part of the code
# are |uri_query| and |ts|; the others are for analysis only.
#
# |uri_query| is assumed to have the form "?query=<URL-encoded query>"
hive_call += ' from wmf.wdqs_extract where uri_query<>"" ' \
+ 'and year=\'' + str(args.year) + '\' and month=\'' \
+ str(months[monthName][0]) + '\' and day=\'' + str(day) + '\''
arguments.append(hive_call)
if subprocess.call(arguments) != 0:
print(("ERROR: Raw data for month " + monthName + " does not "
+ "exist but could not be extracted using hive."))
sys.exit(1)
# The content of the temporary files is then copied to the actual
# raw log data file (with added headers)
with gzip.open(rawLogDataDirectory + "QueryCnt"
+ "%02d"%day + ".tsv.gz", "wb") as dayfile:
dayfile.write(header.encode('utf-8'))
for filename in glob.glob(tempDirectory + '*'):
with open(filename) as temp:
for line in temp:
dayfile.write(line.encode('utf-8'))
shutil.rmtree(tempDirectory)
# We build the call to execute the java application with the location of
# the files, the number of threads to use and any optional arguments needed
mavenCall = ['mvn', 'exec:java@QueryAnalysis']
mavenArguments = '-Dexec.args=-w ' + month + ' -t ' + str(args.threads) + ' -p ' + args.dbLocation + " -q " + args.queryTypeMapLocation
if args.logging:
mavenArguments += " -l"
if args.noBotMetrics:
mavenArguments += " -b"
if args.noDynamicQueryTypes:
mavenArguments += " -d"
if args.noGzipOutput:
mavenArguments += " -g"
if args.noExampleQueriesOutput:
mavenArguments += " -e"
if args.withUniqueQueryDetection:
mavenArguments += " -u"
mavenCall.append(mavenArguments)
owd = os.getcwd()
print("Starting data processing using QueryAnalysis for " + monthName + ".")
if subprocess.call(['mvn', 'clean', 'package']) != 0:
print("ERROR: Could not package the java application.")
sys.exit(1)
if subprocess.call(mavenCall) != 0:
print(("ERROR: Could not execute the java application. Check the logs "
+ "for details or rerun this script with -l to generate logs."))
sys.exit(1)
os.chdir(owd)