# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.
"""
The process module wrap a process and IO redirection
"""
from datetime import timedelta
from functools import partial
import os
import signal
import shlex
import pyuv
import psutil
from psutil import AccessDenied
import six
from .events import EventEmitter
from .util import (bytestring, getcwd, check_uid, check_gid,
bytes2human, substitute_env, IS_WINDOWS)
from .sync import atomic_read, increment, decrement
pyuv.Process.disable_stdio_inheritance()
[docs]def get_process_stats(process=None, interval=0):
"""Return information about a process. (can be an pid or a Process object)
If process is None, will return the information about the current process.
"""
if process is None:
process = psutil.Process(os.getpid())
stats = {}
try:
mem_info = process.get_memory_info()
stats['mem_info1'] = bytes2human(mem_info[0])
stats['mem_info2'] = bytes2human(mem_info[1])
except AccessDenied:
stats['mem_info1'] = stats['mem_info2'] = "N/A"
try:
stats['cpu'] = process.get_cpu_percent(interval=interval)
except AccessDenied:
stats['cpu'] = "N/A"
try:
stats['mem'] = round(process.get_memory_percent(), 1)
except AccessDenied:
stats['mem'] = "N/A"
try:
cpu_times = process.get_cpu_times()
ctime = timedelta(seconds=sum(cpu_times))
ctime = "%s:%s.%s" % (ctime.seconds // 60 % 60,
str((ctime.seconds % 60)).zfill(2),
str(ctime.microseconds)[:2])
except AccessDenied:
ctime = "N/A"
stats['ctime'] = ctime
return stats
[docs]class RedirectIO(object):
pipes_count = 2
def __init__(self, loop, process, stdio=[]):
self.loop = loop
self.process = process
self._emitter = EventEmitter(loop)
self._stdio = []
self._channels = []
# create (channel, stdio) pairs
for label in stdio[:self.pipes_count]:
# io registered can any label, so it's easy to redirect
# stderr to stdout, just use the same label.
p = pyuv.Pipe(loop)
io = pyuv.StdIO(stream=p, flags=pyuv.UV_CREATE_PIPE | \
pyuv.UV_READABLE_PIPE | \
pyuv.UV_WRITABLE_PIPE)
setattr(p, 'label', label)
self._channels.append(p)
self._stdio.append(io)
# create remaining pipes
for _ in range(self.pipes_count - len(self._stdio)):
self._stdio.append(pyuv.StdIO(flags=pyuv.UV_IGNORE))
[docs] def start(self):
# start reading
for p in self._channels:
p.start_read(self._on_read)
@property
def stdio(self):
return self._stdio
[docs] def subscribe(self, label, listener):
self._emitter.subscribe(label, listener)
[docs] def unsubscribe(self, label, listener):
self._emitter.unsubscribe(label, listener)
[docs] def stop(self, all_events=False):
for p in self._channels:
if not p.closed:
p.close()
if all_events:
self._emitter.close()
def _on_read(self, handle, data, error):
if not data:
return
label = getattr(handle, 'label')
msg = dict(event=label, name=self.process.name, pid=self.process.pid,
data=data)
self._emitter.publish(label, msg)
[docs]class RedirectStdin(object):
""" redirect stdin allows multiple sender to write to same pipe """
def __init__(self, loop, process):
self.loop = loop
self.process = process
self.channel = pyuv.Pipe(loop)
self.stdio = pyuv.StdIO(stream=self.channel,
flags=pyuv.UV_CREATE_PIPE | \
pyuv.UV_READABLE_PIPE | \
pyuv.UV_WRITABLE_PIPE )
self._emitter = EventEmitter(loop)
[docs] def start(self):
self._emitter.subscribe("WRITE", self._on_write)
self._emitter.subscribe("WRITELINES", self._on_writelines)
[docs] def write(self, data):
self._emitter.publish("WRITE", data)
[docs] def writelines(self, data):
self._emitter.publish("WRITELINES", data)
[docs] def stop(self, all_events=False):
if not self.channel.closed:
self.channel.close()
if all_events:
self._emitter.close()
def _on_write(self, evtype, data):
self.channel.write(data)
def _on_writelines(self, evtype, data):
self.channel.writelines(data)
def _on_read(self, handle, data, error):
if not data:
return
label = getattr(handle, 'label')
msg = dict(event=label, name=self.process.name, pid=self.process.pid,
data=data)
self._emitter.publish(label, msg)
[docs]class Stream(RedirectStdin):
""" create custom stdio """
def __init__(self, loop, process, id):
super(Stream, self).__init__(loop, process)
self.id = id
[docs] def start(self):
super(Stream, self).start()
self.channel.start_read(self._on_read)
[docs] def subscribe(self, listener):
self._emitter.subscribe('READ', listener)
[docs] def unsubscribe(self, listener):
self._emitter.unsubscribe('READ', listener)
def _on_read(self, handle, data, error):
if not data:
return
msg = dict(event='READ', name=self.process.name, pid=self.process.pid,
data=data)
self._emitter.publish('READ', msg)
[docs]class ProcessWatcher(object):
""" object to retrieve process stats """
def __init__(self, loop, process):
self.loop = loop
self.process = process
self._last_info = None
self.on_refresh_cb = None
self._active = 0
self._refcount = 0
self._emitter = EventEmitter(loop)
@property
def active(self):
return atomic_read(self._active) > 0
[docs] def subscribe(self, listener):
self._refcount = increment(self._refcount)
self._emitter.subscribe("stat", listener)
self._start()
[docs] def subscribe_once(self, listener):
self._emitter.subscribe_once("stat", listener)
[docs] def unsubscribe(self, listener):
self._emitter.unsubscribe(".", listener)
self._refcount = decrement(self._refcount)
if not atomic_read(self._refcount):
self.stop()
[docs] def stop(self, all_events=False):
if self.active:
self._active = decrement(self._active)
self._timer.stop()
if all_events:
self._emitter.close()
def _async_refresh(self, handle):
try:
self._last_info = self.refresh()
except psutil.NoSuchProcess:
self.stop()
return
# create the message
msg = self._last_info.copy()
msg.update({'pid': self.process.pid, 'os_pid': self.process.os_pid})
# publish it
self._emitter.publish("stat", msg)
[docs] def refresh(self, interval=0):
return get_process_stats(self.process._pprocess, interval=interval)
def _start(self):
if not self.active:
self._timer = pyuv.Timer(self.loop)
# start the timer to refresh the informations
# 0.1 is the minimum interval to fetch cpu stats for this
# process.
self._timer.start(self._async_refresh, 0.1, 0.1)
self._active = increment(self._active)
[docs]class ProcessConfig(object):
""" object to maintain a process config """
DEFAULT_PARAMS = {
"args": [],
"env": {},
"uid": None,
"gid": None,
"cwd": None,
"shell": False,
"redirect_output": [],
"redirect_input": False,
"custom_streams": [],
"custom_channels": []}
def __init__(self, name, cmd, **settings):
"""
Initialize the ProcessConfig object
Args:
- **name**: name of the process
- **cmd**: program command, string)
Settings:
- **args**: the arguments for the command to run. Can be a list or
a string. If **args** is a string, it's splitted using
:func:`shlex.split`. Defaults to None.
- **env**: a mapping containing the environment variables the command
will run with. Optional
- **uid**: int or str, user id
- **gid**: int or st, user group id,
- **cwd**: working dir
- **shell**: boolean, run the script in a shell. (UNIX
only),
- **os_env**: boolean, pass the os environment to the program
- **numprocesses**: int the number of OS processes to launch for
this description
- **flapping**: a FlappingInfo instance or, if flapping detection
should be used. flapping parameters are:
- **attempts**: maximum number of attempts before we stop the
process and set it to retry later
- **window**: period in which we are testing the number of
retry
- **retry_in**: seconds, the time after we restart the process
and try to spawn them
- **max_retry**: maximum number of retry before we give up
and stop the process.
- **redirect_output**: list of io to redict (max 2) this is a list of custom
labels to use for the redirection. Ex: ["a", "b"] will
redirect stdout & stderr and stdout events will be labeled "a"
- **redirect_input**: Boolean (False is the default). Set it if
you want to be able to write to stdin.
- **graceful_timeout**: graceful time before we send a SIGKILL
to the process (which definitely kill it). By default 30s.
This is a time we let to a process to exit cleanly.
"""
self.name = name
self.cmd = cmd
self.settings = settings
def __str__(self):
return "process: %s" % self.name
[docs] def make_process(self, loop, pid, label, env=None, on_exit=None):
""" create a Process object from the configuration
Args:
- **loop**: main pyuv loop instance that will maintain the process
- **pid**: process id, generally given by the manager
- **label**: the job label. Usually the process type.
context. A context can be for example an application.
- **on_exit**: callback called when the process exited.
"""
params = {}
for name, default in self.DEFAULT_PARAMS.items():
params[name] = self.settings.get(name, default)
os_env = self.settings.get('os_env', False)
if os_env:
env = params.get('env') or {}
env.update(os.environ)
params['env'] = env
if env is not None:
params['env'].update(env)
params['on_exit_cb'] = on_exit
return Process(loop, pid, label, self.cmd, **params)
def __getitem__(self, key):
if key == "name":
return self.name
if key == "cmd":
return self.cmd
return self.settings[key]
def __setitem__(self, key, value):
if key in ("name", "cmd"):
setattr(self, key, value)
else:
self.settings[key] = value
def __contains__(self, key):
if key in ('name', 'cmd'):
return True
if key in self.settings:
return True
return False
[docs] def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
[docs] def to_dict(self):
d = dict(name=self.name, cmd=self.cmd)
d.update(self.settings)
return d
[docs] @classmethod
def from_dict(cls, config):
d = config.copy()
try:
name = d.pop('name')
cmd = d.pop('cmd')
except KeyError:
raise ValueError("invalid config dict")
return cls(name, cmd, **d)
[docs]class Process(object):
""" class wrapping a process
Args:
- **loop**: main application loop (a pyuv Loop instance)
- **name**: name of the process
- **cmd**: program command, string)
- **args**: the arguments for the command to run. Can be a list or
a string. If **args** is a string, it's splitted using
:func:`shlex.split`. Defaults to None.
- **env**: a mapping containing the environment variables the command
will run with. Optional
- **uid**: int or str, user id
- **gid**: int or st, user group id,
- **cwd**: working dir
- **detach**: the process is launched but won't be monitored and
won't exit when the manager is stopped.
- **shell**: boolean, run the script in a shell. (UNIX
only)
- **redirect_output**: list of io to redict (max 2) this is a list of custom
labels to use for the redirection. Ex: ["a", "b"] will
redirect stdoutt & stderr and stdout events will be labeled "a"
- **redirect_input**: Boolean (False is the default). Set it if
you want to be able to write to stdin.
- **custom_streams**: list of additional streams that should be created
and passed to process. This is a list of streams labels. They become
available through :attr:`streams` attribute.
- **custom_channels**: list of additional channels that should be passed to
process.
"""
def __init__(self, loop, pid, name, cmd, args=None, env=None, uid=None,
gid=None, cwd=None, detach=False, shell=False,
redirect_output=[], redirect_input=False, custom_streams=[],
custom_channels=[], on_exit_cb=None):
self.loop = loop
self.pid = pid
self.name = name
self.cmd = cmd
self.env = env or {}
# set command
self.cmd = bytestring(cmd)
# remove args from the command
args_ = shlex.split(self.cmd)
if len(args_) == 1:
self.args = []
else:
self.cmd = args_[0]
self.args = args_[1:]
# if args have been passed to the options then add them
if args and args is not None:
if isinstance(args, six.string_types):
self.args.extend(shlex.split(bytestring(args)))
else:
self.args.extend([bytestring(arg) for arg in args])
# replace envirnonnement variable in args
# $PORT for example will become the given env variable.
self.args = [substitute_env(arg, self.env) for arg in self.args]
if shell:
self.args = ['-c', self.cmd] + self.args
self.cmd = "sh"
self.uid = uid
self.gid = gid
if not IS_WINDOWS:
if self.uid is not None:
self.uid = check_uid(uid)
if self.gid is not None:
self.gid = check_gid(gid)
self.cwd = cwd or getcwd()
self.redirect_output = redirect_output
self.redirect_input = redirect_input
self.custom_streams = custom_streams
self.custom_channels = custom_channels
self._redirect_io = None
self._redirect_in = None
self.streams = {}
self.detach = detach
self.on_exit_cb = on_exit_cb
self._process = None
self._pprocess = None
self._process_watcher = None
self._os_pid = None
self._info = None
self.stopped = False
self.graceful_time = 0
self.graceful_timeout = None
self.once = False
self._setup_stdio()
def _setup_stdio(self):
# for now we ignore all stdin
if not self.redirect_input:
self._stdio = [pyuv.StdIO(flags=pyuv.UV_IGNORE)]
else:
self._redirect_in = RedirectStdin(self.loop, self)
self._stdio = [self._redirect_in.stdio]
self._redirect_io = RedirectIO(self.loop, self,
self.redirect_output)
self._stdio.extend(self._redirect_io.stdio)
# create custom streams,
for label in self.custom_streams:
stream = self.streams[label] = Stream(self.loop, self,
len(self._stdio))
self._stdio.append(stream.stdio)
# create containers for custom channels.
for channel in self.custom_channels:
assert not channel.closed, \
"Closed channel {0!r} can't be passed to process!" \
.format(channel)
self._stdio.append(pyuv.StdIO(stream=channel,
flags=pyuv.UV_INHERIT_STREAM))
[docs] def spawn(self, once=False, graceful_timeout=None, env=None):
""" spawn the process """
self.once = once
self.graceful_timeout = graceful_timeout
if env is not None:
self.env.update(env)
kwargs = dict(
file = self.cmd,
exit_callback = self._exit_cb,
args = self.args,
env = self.env,
cwd = self.cwd,
stdio = self._stdio)
flags = 0
if self.uid is not None:
kwargs['uid'] = self.uid
flags = pyuv.UV_PROCESS_SETUID
if self.gid is not None:
kwargs['gid'] = self.gid
flags = flags | pyuv.UV_PROCESS_SETGID
if self.detach:
flags = flags | pyuv.UV_PROCESS_DETACHED
self.running = True
self._process = pyuv.Process(self.loop)
# spawn the process
self._process.spawn(**kwargs)
self._running = True
self._os_pid = self._process.pid
self._pprocess = psutil.Process(self._process.pid)
# start to cycle the cpu stats so we can have an accurate number on
# the first call of ``Process.stats``
self.loop.queue_work(self._init_cpustats)
# start redirecting IO
self._redirect_io.start()
if self._redirect_in is not None:
self._redirect_in.start()
for stream in self.streams.values():
stream.start()
@property
def active(self):
return self._process.active
@property
def closed(self):
return self._process.closed
@property
def os_pid(self):
""" return the process pid """
if self._os_pid is None:
self._os_pid = self._process.pid
return self._os_pid
@property
def info(self):
""" return the process info. If the process is monitored it
return the last informations stored asynchronously by the watcher"""
# info we have on this process
if self._info is None:
self._info = dict(pid=self.pid, name=self.name, cmd=self.cmd,
args=self.args, env=self.env, uid=self.uid, gid=self.gid,
os_pid=None, create_time=None, commited=self.once,
redirect_output=self.redirect_output,
redirect_input=self.redirect_input,
custom_streams=self.custom_streams)
if (self._info.get('create_time') is None and
self._pprocess is not None):
self._info.update({'os_pid': self.os_pid,
'create_time':self._pprocess.create_time()})
self._info['active'] = self._process.active
return self._info
@property
def stats(self):
if not self._pprocess:
return
return get_process_stats(self._pprocess, 0.0)
@property
def status(self):
""" return the process status """
if not self._pprocess:
return
return self._pprocess.status
def __lt__(self, other):
return (self.pid != other.pid and
self.graceful_time < other.graceful_time)
__cmp__ = __lt__
[docs] def monitor(self, listener=None):
""" start to monitor the process
Listener can be any callable and receive *("stat", process_info)*
"""
if not self._process_watcher:
self._process_watcher = ProcessWatcher(self.loop, self)
self._process_watcher.subscribe(listener)
[docs] def unmonitor(self, listener):
""" stop monitoring this process.
listener is the callback passed to the monitor function
previously.
"""
if not self._process_watcher:
return
self._process_watcher.unsubscribe(listener)
[docs] def monitor_io(self, io_label, listener):
""" subscribe to registered IO events """
if not self._redirect_io:
raise IOError("%s not redirected" % self.name)
self._redirect_io.subscribe(io_label, listener)
[docs] def unmonitor_io(self, io_label, listener):
""" unsubscribe to the IO event """
if not self._redirect_io:
return
self._redirect_io.unsubscribe(io_label, listener)
[docs] def write(self, data):
""" send data to the process via stdin"""
if not self._redirect_in:
raise IOError("stdin not redirected")
self._redirect_in.write(data)
[docs] def writelines(self, data):
""" send data to the process via stdin"""
if not self._redirect_in:
raise IOError("stdin not redirected")
self._redirect_in.writelines(data)
[docs] def stop(self):
""" stop the process """
self.kill(signal.SIGTERM)
[docs] def kill(self, signum):
""" send a signal to the process """
if not self.active:
return
self._process.kill(signum)
[docs] def close(self):
self._process.close()
def _init_cpustats(self):
try:
get_process_stats(self._pprocess, 0.1)
except psutil.NoSuchProcess:
# catch this error. It can can happen when the process is closing
# very fast
pass
def _exit_cb(self, handle, exit_status, term_signal):
if self._redirect_io is not None:
self._redirect_io.stop(all_events=True)
if self._redirect_in is not None:
self._redirect_in.stop(all_events=True)
for custom_io in self.streams.values():
custom_io.stop(all_events=True)
if self._process_watcher is not None:
self._process_watcher.stop(all_events=True)
self._running = False
handle.close()
# handle the exit callback
if self.on_exit_cb is not None:
self.on_exit_cb(self, exit_status, term_signal)