2 # -*- coding: utf-8 -*-
3 # This script generates hundreds of random keypresses per second,
4 # and sends them to the lyx window
5 # It requires xvkbd and wmctrl
6 # It generates a log of the KEYCODES it sends as development/keystest/out/KEYCODES
13 #from subprocess import call
16 print 'Beginning keytest.py'
18 FNULL = open('/dev/null', 'w')
39 for k in range(97, 123):
42 for k in range(97, 123):
43 keycode[:0] = ["\A" + chr(k)]
45 for k in range(97, 123):
46 keycode[:0] = ["\A" + chr(k)]
48 for k in range(97, 123):
49 keycode[:0] = ["\C" + chr(k)]
51 self.keycode = keycode
56 self.count = self.count + 1
57 if self.count % 200 == 0:
59 elif self.count > self.count_max:
64 keystr = keystr + self.keycode[random.randint(1,
65 len(self.keycode)) - 1]
66 return 'KK: ' + keystr
69 class CommandSourceFromFile(CommandSource):
71 def __init__(self, filename, p):
73 self.infile = open(filename, 'r')
74 self.lines = self.infile.readlines()
78 print p, self.p, 'self.p'
83 # Now we start randomly dropping lines, which we hope are redundant
84 # p is the probability that any given line will be removed
87 if random.uniform(0, 1) < 0.5:
88 print 'randomdrop_independant\n'
89 self.randomdrop_independant()
91 print 'randomdrop_slice\n'
92 self.randomdrop_slice()
93 if screenshot_out is None:
97 self.max_count = max(len(self.lines) + 20, count_atleast)
98 if len(self.lines) < 1:
101 def randomdrop_independant(self):
104 # The next couple of lines are to ensure that at least one line is dropped
106 drop = random.randint(0, len(self.lines) - 1)
108 #p = p - 1 / len(self.lines)
109 origlines = self.lines
112 if random.uniform(0, 1) < self.p:
113 print 'Randomly dropping line ' + l + '\n'
120 def randomdrop_slice(self):
122 if random.uniform(0, 1) < 0.4:
123 lines.append(lines[0])
125 num_lines = len(lines)
126 max_drop = max(5, num_lines / 5)
127 num_drop = random.randint(1, 5)
128 drop_mid = random.randint(0, num_lines)
129 drop_start = max(drop_mid - num_drop / 2, 0)
130 drop_end = min(drop_start + num_drop, num_lines)
131 print drop_start, drop_mid, drop_end
133 del lines[drop_start:drop_end]
137 def getCommand(self):
138 if self.count >= self.max_count:
140 if self.i >= len(self.lines):
141 self.loops = self.loops + 1
146 line = self.lines[self.i]
147 self.count = self.count + 1
149 #print 'Line read: <<' + line + '>>\n'
150 sys.stdout.write('r')
151 return line.rstrip('\n').rstrip()
155 fname = '/proc/' + lyx_pid + '/status'
156 if not os.path.exists(fname):
159 lines = f.readlines()
160 sleeping = lines[1].find('(sleeping)') > 0
162 # print 'LYX_STATE', lines[1] , 'SLEEPING=', sleeping
167 def sendKeystring(keystr, LYX_PID):
169 # print "sending keystring "+keystr+"\n"
171 if not re.match(".*\w.*", keystr):
172 print 'print .' + keystr + '.\n'
174 before_secs = time.time()
175 while not lyx_sleeping():
177 sys.stdout.write('.')
178 if time.time() - before_secs > 180:
179 print 'Killing due to freeze (KILL_FREEZE)'
181 # Do profiling, but sysprof has no command line interface?
182 # os.system("killall -KILL lyx")
185 if not screenshot_out is None:
186 while not lyx_sleeping():
189 print 'Making Screenshot: ' + screenshot_out + ' OF ' + infilename
191 os.system('import -window root '+screenshot_out+str(x.count)+".png")
195 ["xvkbd", "-xsendevent", "-delay", DELAY, "-text", keystr],
196 stdout=FNULL,stderr=FNULL
198 sys.stdout.write('*')
200 sys.stdout.write('X')
202 def system_retry(num_retry, cmd):
205 while ( ( i < num_retry ) and ( rtn != 0) ):
210 print "Command Failed: "+cmd
215 os.system("echo x-session-manager PID: $X_PID.")
216 os.system("echo x-session-manager open files: `lsof -p $X_PID | grep ICE-unix | wc -l`")
217 ####os.system("wmctrl -l | ( grep '"+lyx_window_name+"' || ( killall lyx ; sleep 1 ; killall -9 lyx ))")
218 #os.system("wmctrl -R '"+lyx_window_name+"' ;sleep 0.1")
219 system_retry(30, "wmctrl -R '"+lyx_window_name+"'")
222 lyx_pid = os.environ.get('LYX_PID')
223 print 'lyx_pid: ' + lyx_pid + '\n'
224 infilename = os.environ.get('KEYTEST_INFILE')
225 outfilename = os.environ.get('KEYTEST_OUTFILE')
226 max_drop = os.environ.get('MAX_DROP')
227 lyx_window_name = os.environ.get('LYX_WINDOW_NAME')
228 screenshot_out = os.environ.get('SCREENSHOT_OUT')
230 file_new_command = os.environ.get('FILE_NEW_COMMAND')
231 if file_new_command is None:
232 file_new_command = "\Afn"
234 ResetCommand = os.environ.get('RESET_COMMAND')
235 if ResetCommand is None:
236 ResetCommand = "\[Escape]\[Escape]\[Escape]\[Escape]" + file_new_command
237 #ResetCommand="\[Escape]\[Escape]\[Escape]\[Escape]\Cw\Cw\Cw\Cw\Cw\Afn"
239 if lyx_window_name is None:
240 lyx_window_name = 'LyX'
242 print 'outfilename: ' + outfilename + '\n'
243 print 'max_drop: ' + max_drop + '\n'
245 if infilename is None:
246 print 'infilename is None\n'
248 print 'Using x=CommandSource\n'
250 print 'infilename: ' + infilename + '\n'
251 probability_we_drop_a_command = random.uniform(0, float(max_drop))
252 print 'probability_we_drop_a_command: '
253 print '%s' % probability_we_drop_a_command
255 x = CommandSourceFromFile(infilename, probability_we_drop_a_command)
256 print 'Using x=CommandSourceFromFile\n'
258 outfile = open(outfilename, 'w')
261 sendKeystring("\Afn", lyx_pid)
262 write_commands = True
265 #os.system('echo -n LOADAVG:; cat /proc/loadavg')
269 outfile = open(outfilename + '+', 'w')
271 outfile.writelines(c + '\n')
276 elif c[0:4] == 'KK: ':
277 if os.path.exists('/proc/' + lyx_pid + '/status'):
278 sendKeystring(c[4:], lyx_pid)
280 ##os.system('killall lyx; sleep 2 ; killall -9 lyx')
281 print 'No path /proc/' + lyx_pid + '/status, exiting'
283 elif c[0:4] == 'KD: ':
284 DELAY = c[4:].rstrip('\n')
285 print 'Setting DELAY to ' + DELAY + '.'
288 sendKeystring(ResetCommand, lyx_pid)
290 print "Unrecognised Command '" + c + "'\n"