Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> 

# 

# This file is part of Ansible 

# 

# Ansible is free software: you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 3 of the License, or 

# (at your option) any later version. 

# 

# Ansible is distributed in the hope that it will be useful, 

# but WITHOUT ANY WARRANTY; without even the implied warranty of 

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

# GNU General Public License for more details. 

# 

# You should have received a copy of the GNU General Public License 

# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 

 

# Make coding more python3-ish 

from __future__ import (absolute_import, division, print_function) 

__metaclass__ = type 

 

import multiprocessing 

import os 

import tempfile 

 

from ansible import constants as C 

from ansible.errors import AnsibleError 

from ansible.executor.play_iterator import PlayIterator 

from ansible.executor.stats import AggregateStats 

from ansible.executor.task_result import TaskResult 

from ansible.module_utils.six import string_types 

from ansible.module_utils._text import to_text 

from ansible.playbook.block import Block 

from ansible.playbook.play_context import PlayContext 

from ansible.plugins.loader import callback_loader, strategy_loader, module_loader 

from ansible.plugins.callback import CallbackBase 

from ansible.template import Templar 

from ansible.utils.helpers import pct_to_int 

from ansible.vars.hostvars import HostVars 

from ansible.vars.reserved import warn_if_reserved 

 

try: 

from __main__ import display 

except ImportError: 

from ansible.utils.display import Display 

display = Display() 

 

 

__all__ = ['TaskQueueManager'] 

 

 

class TaskQueueManager: 

 

''' 

This class handles the multiprocessing requirements of Ansible by 

creating a pool of worker forks, a result handler fork, and a 

manager object with shared datastructures/queues for coordinating 

work between all processes. 

 

The queue manager is responsible for loading the play strategy plugin, 

which dispatches the Play's tasks to hosts. 

''' 

 

RUN_OK = 0 

RUN_ERROR = 1 

RUN_FAILED_HOSTS = 2 

RUN_UNREACHABLE_HOSTS = 4 

RUN_FAILED_BREAK_PLAY = 8 

RUN_UNKNOWN_ERROR = 255 

 

def __init__(self, inventory, variable_manager, loader, options, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False): 

 

self._inventory = inventory 

self._variable_manager = variable_manager 

self._loader = loader 

self._options = options 

self._stats = AggregateStats() 

self.passwords = passwords 

self._stdout_callback = stdout_callback 

self._run_additional_callbacks = run_additional_callbacks 

self._run_tree = run_tree 

 

self._callbacks_loaded = False 

self._callback_plugins = [] 

self._start_at_done = False 

 

# make sure any module paths (if specified) are added to the module_loader 

88 ↛ 89line 88 didn't jump to line 89, because the condition on line 88 was never true if options.module_path: 

for path in options.module_path: 

if path: 

module_loader.add_directory(path) 

 

# a special flag to help us exit cleanly 

self._terminated = False 

 

# this dictionary is used to keep track of notified handlers 

self._notified_handlers = dict() 

self._listening_handlers = dict() 

 

# dictionaries to keep track of failed/unreachable hosts 

self._failed_hosts = dict() 

self._unreachable_hosts = dict() 

 

self._final_q = multiprocessing.Queue() 

 

# A temporary file (opened pre-fork) used by connection 

# plugins for inter-process locking. 

self._connection_lockfile = tempfile.TemporaryFile() 

 

def _initialize_processes(self, num): 

self._workers = [] 

 

for i in range(num): 

rslt_q = multiprocessing.Queue() 

self._workers.append([None, rslt_q]) 

 

def _initialize_notified_handlers(self, play): 

''' 

Clears and initializes the shared notified handlers dict with entries 

for each handler in the play, which is an empty array that will contain 

inventory hostnames for those hosts triggering the handler. 

''' 

 

# Zero the dictionary first by removing any entries there. 

# Proxied dicts don't support iteritems, so we have to use keys() 

self._notified_handlers.clear() 

self._listening_handlers.clear() 

 

def _process_block(b): 

temp_list = [] 

for t in b.block: 

if isinstance(t, Block): 

temp_list.extend(_process_block(t)) 

else: 

temp_list.append(t) 

return temp_list 

 

handler_list = [] 

139 ↛ 140line 139 didn't jump to line 140, because the loop on line 139 never started for handler_block in play.handlers: 

handler_list.extend(_process_block(handler_block)) 

# then initialize it with the given handler list 

self.update_handler_list(handler_list) 

 

def update_handler_list(self, handler_list): 

145 ↛ 146line 145 didn't jump to line 146, because the loop on line 145 never started for handler in handler_list: 

if handler._uuid not in self._notified_handlers: 

display.debug("Adding handler %s to notified list" % handler.name) 

self._notified_handlers[handler._uuid] = [] 

if handler.listen: 

listeners = handler.listen 

if not isinstance(listeners, list): 

listeners = [listeners] 

for listener in listeners: 

if listener not in self._listening_handlers: 

self._listening_handlers[listener] = [] 

display.debug("Adding handler %s to listening list" % handler.name) 

self._listening_handlers[listener].append(handler._uuid) 

 

def load_callbacks(self): 

''' 

Loads all available callbacks, with the exception of those which 

utilize the CALLBACK_TYPE option. When CALLBACK_TYPE is set to 'stdout', 

only one such callback plugin will be loaded. 

''' 

 

166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true if self._callbacks_loaded: 

return 

 

stdout_callback_loaded = False 

170 ↛ 173line 170 didn't jump to line 173, because the condition on line 170 was never false if self._stdout_callback is None: 

self._stdout_callback = C.DEFAULT_STDOUT_CALLBACK 

 

173 ↛ 174line 173 didn't jump to line 174, because the condition on line 173 was never true if isinstance(self._stdout_callback, CallbackBase): 

stdout_callback_loaded = True 

175 ↛ 188line 175 didn't jump to line 188, because the condition on line 175 was never false elif isinstance(self._stdout_callback, string_types): 

176 ↛ 177line 176 didn't jump to line 177, because the condition on line 176 was never true if self._stdout_callback not in callback_loader: 

raise AnsibleError("Invalid callback for stdout specified: %s" % self._stdout_callback) 

else: 

self._stdout_callback = callback_loader.get(self._stdout_callback) 

try: 

self._stdout_callback.set_options() 

except AttributeError: 

display.deprecated("%s stdout callback, does not support setting 'options', it will work for now, " 

" but this will be required in the future and should be updated," 

" see the 2.4 porting guide for details." % self._stdout_callback._load_name, version="2.9") 

stdout_callback_loaded = True 

else: 

raise AnsibleError("callback must be an instance of CallbackBase or the name of a callback plugin") 

 

for callback_plugin in callback_loader.all(class_only=True): 

191 ↛ 208line 191 didn't jump to line 208, because the condition on line 191 was never false if hasattr(callback_plugin, 'CALLBACK_VERSION') and callback_plugin.CALLBACK_VERSION >= 2.0: 

# we only allow one callback of type 'stdout' to be loaded, so check 

# the name of the current plugin and type to see if we need to skip 

# loading this callback plugin 

callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', None) 

callback_needs_whitelist = getattr(callback_plugin, 'CALLBACK_NEEDS_WHITELIST', False) 

(callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path)) 

if callback_type == 'stdout': 

199 ↛ 200,   199 ↛ 2012 missed branches: 1) line 199 didn't jump to line 200, because the condition on line 199 was never true, 2) line 199 didn't jump to line 201, because the condition on line 199 was never false if callback_name != self._stdout_callback or stdout_callback_loaded: 

continue 

stdout_callback_loaded = True 

202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true elif callback_name == 'tree' and self._run_tree: 

pass 

elif not self._run_additional_callbacks or (callback_needs_whitelist and ( 

C.DEFAULT_CALLBACK_WHITELIST is None or callback_name not in C.DEFAULT_CALLBACK_WHITELIST)): 

continue 

 

callback_obj = callback_plugin() 

try: 

callback_obj.set_options() 

except AttributeError: 

display.deprecated("%s callback, does not support setting 'options', it will work for now, " 

" but this will be required in the future and should be updated, " 

" see the 2.4 porting guide for details." % self._stdout_callback._load_name, version="2.9") 

self._callback_plugins.append(callback_obj) 

 

self._callbacks_loaded = True 

 

def run(self, play): 

''' 

Iterates over the roles/tasks in a play, using the given (or default) 

strategy for queueing tasks. The default is the linear strategy, which 

operates like classic Ansible by keeping all hosts in lock-step with 

a given task (meaning no hosts move on to the next task until all hosts 

are done with the current task). 

''' 

 

228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true if not self._callbacks_loaded: 

self.load_callbacks() 

 

all_vars = self._variable_manager.get_vars(play=play) 

warn_if_reserved(all_vars) 

templar = Templar(loader=self._loader, variables=all_vars) 

 

new_play = play.copy() 

new_play.post_validate(templar) 

new_play.handlers = new_play.compile_roles_handlers() + new_play.handlers 

 

self.hostvars = HostVars( 

inventory=self._inventory, 

variable_manager=self._variable_manager, 

loader=self._loader, 

) 

 

play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno()) 

for callback_plugin in self._callback_plugins: 

247 ↛ 246line 247 didn't jump to line 246, because the condition on line 247 was never false if hasattr(callback_plugin, 'set_play_context'): 

callback_plugin.set_play_context(play_context) 

 

self.send_callback('v2_playbook_on_play_start', new_play) 

 

# initialize the shared dictionary containing the notified handlers 

self._initialize_notified_handlers(new_play) 

 

# build the iterator 

iterator = PlayIterator( 

inventory=self._inventory, 

play=new_play, 

play_context=play_context, 

variable_manager=self._variable_manager, 

all_vars=all_vars, 

start_at_done=self._start_at_done, 

) 

 

# adjust to # of workers to configured forks or size of batch, whatever is lower 

self._initialize_processes(min(self._options.forks, iterator.batch_size)) 

 

# load the specified strategy (or the default linear one) 

strategy = strategy_loader.get(new_play.strategy, self) 

270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true if strategy is None: 

raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds) 

 

# Because the TQM may survive multiple play runs, we start by marking 

# any hosts as failed in the iterator here which may have been marked 

# as failed in previous runs. Then we clear the internal list of failed 

# hosts so we know what failed this round. 

277 ↛ 278line 277 didn't jump to line 278, because the loop on line 277 never started for host_name in self._failed_hosts.keys(): 

host = self._inventory.get_host(host_name) 

iterator.mark_host_failed(host) 

 

self.clear_failed_hosts() 

 

# during initialization, the PlayContext will clear the start_at_task 

# field to signal that a matching task was found, so check that here 

# and remember it so we don't try to skip tasks on future plays 

286 ↛ 287line 286 didn't jump to line 287, because the condition on line 286 was never true if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None: 

self._start_at_done = True 

 

# and run the play using the strategy and cleanup on way out 

play_return = strategy.run(iterator, play_context) 

 

# now re-save the hosts that failed from the iterator to our internal list 

293 ↛ 294line 293 didn't jump to line 294, because the loop on line 293 never started for host_name in iterator.get_failed_hosts(): 

self._failed_hosts[host_name] = True 

 

strategy.cleanup() 

self._cleanup_processes() 

return play_return 

 

def cleanup(self): 

display.debug("RUNNING CLEANUP") 

self.terminate() 

self._final_q.close() 

self._cleanup_processes() 

 

def _cleanup_processes(self): 

307 ↛ exitline 307 didn't return from function '_cleanup_processes', because the condition on line 307 was never false if hasattr(self, '_workers'): 

for (worker_prc, rslt_q) in self._workers: 

rslt_q.close() 

310 ↛ 308line 310 didn't jump to line 308, because the condition on line 310 was never false if worker_prc and worker_prc.is_alive(): 

try: 

worker_prc.terminate() 

except AttributeError: 

pass 

 

def clear_failed_hosts(self): 

self._failed_hosts = dict() 

 

def get_inventory(self): 

return self._inventory 

 

def get_variable_manager(self): 

return self._variable_manager 

 

def get_loader(self): 

return self._loader 

 

def get_workers(self): 

return self._workers[:] 

 

def terminate(self): 

self._terminated = True 

 

def has_dead_workers(self): 

 

# [<WorkerProcess(WorkerProcess-2, stopped[SIGKILL])>, 

# <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])> 

 

defunct = False 

for (idx, x) in enumerate(self._workers): 

341 ↛ 340line 341 didn't jump to line 340, because the condition on line 341 was never false if hasattr(x[0], 'exitcode'): 

342 ↛ 343line 342 didn't jump to line 343, because the condition on line 342 was never true if x[0].exitcode in [-9, -11, -15]: 

defunct = True 

return defunct 

 

def send_callback(self, method_name, *args, **kwargs): 

for callback_plugin in [self._stdout_callback] + self._callback_plugins: 

# a plugin that set self.disabled to True will not be called 

# see osx_say.py example for such a plugin 

if getattr(callback_plugin, 'disabled', False): 

continue 

 

# try to find v2 method, fallback to v1 method, ignore callback if no method found 

methods = [] 

for possible in [method_name, 'v2_on_any']: 

gotit = getattr(callback_plugin, possible, None) 

357 ↛ 358line 357 didn't jump to line 358, because the condition on line 357 was never true if gotit is None: 

gotit = getattr(callback_plugin, possible.replace('v2_', ''), None) 

359 ↛ 355line 359 didn't jump to line 355, because the condition on line 359 was never false if gotit is not None: 

methods.append(gotit) 

 

# send clean copies 

new_args = [] 

for arg in args: 

# FIXME: add play/task cleaners 

if isinstance(arg, TaskResult): 

new_args.append(arg.clean_copy()) 

# elif isinstance(arg, Play): 

# elif isinstance(arg, Task): 

else: 

new_args.append(arg) 

 

for method in methods: 

try: 

method(*new_args, **kwargs) 

except Exception as e: 

# TODO: add config toggle to make this fatal or not? 

display.warning(u"Failure using method (%s) in callback plugin (%s): %s" % (to_text(method_name), to_text(callback_plugin), to_text(e))) 

from traceback import format_tb 

from sys import exc_info 

display.vvv('Callback Exception: \n' + ' '.join(format_tb(exc_info()[2])))