This URL has Read-Only access.

Statistics
| Branch: | Tag: | Revision:

root / py / scenic / process.py @ eb9e75ef

History | View | Annotate | Download (9.9 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
"""
4
Streamer Process management.
5
"""
6
import os
7
import time
8
import logging
9
import signal
10

    
11
from twisted.internet import error
12
from twisted.internet import protocol
13
from twisted.internet import reactor
14
from twisted.python import procutils
15
from twisted.internet import utils
16

    
17
from scenic import sig
18

    
19
# constants for the slave process
20
STATE_STARTING = "STARTING"
21
STATE_RUNNING = "RUNNING"
22
STATE_STOPPING = "STOPPING"
23
STATE_STOPPED = "STOPPED"
24

    
25
class ProcessError(Exception):
26
    pass
27

    
28
def run_once(executable, *args):
29
    """
30
    Runs a command, without looking at its output or return value.
31
    Returns a Deferred or None.
32
    """
33
    def _cb(result):
34
        #print(result)
35
        pass
36
    try:
37
        executable = procutils.which(executable)[0]
38
    except IndexError:
39
        print("Could not find executable %s" % (executable))
40
        return None
41
    else:
42
        print("Calling %s %s" % (executable, list(args)))
43
        d = utils.getProcessValue(executable, args, os.environ, '.', reactor)
44
        d.addCallback(_cb)
45
        return d
46

    
47
class ProcessIO(protocol.ProcessProtocol):
48
    """
49
    process IO
50
     
51
    Its stdout and stderr streams are logged to a file.    
52
    """
53
    def __init__(self, manager):
54
        """
55
        @param slave: Manager instance.
56
        """
57
        self.manager = manager
58

    
59
    def connectionMade(self):
60
        self.manager._on_connection_made()
61

    
62
    def outReceived(self, data):
63
        for line in data.splitlines():
64
            if line != "":
65
                self.manager.stdout_line_signal(line)
66

    
67
    def errReceived(self, data):
68
        for line in data.splitlines().strip():
69
            if line != "":
70
                self.manager.stderr_line_signal(line)
71

    
72
    def processEnded(self, reason):
73
        exit_code = reason.value.exitCode
74
        if exit_code is None:
75
            exit_code = reason.value.signal
76
        self.manager._on_process_ended(exit_code)
77
    
78
    def processExited(self, reason):
79
        self.manager.log("process has exited " + str(reason.value))
80
    
81
class ProcessManager(object):
82
    """
83
    Manages a streamer process. 
84
    """
85
    def __init__(self, command=None, identifier=None, env=None):
86
        """
87
        @param command: Shell string. The first item is the name of the name of the executable.
88
        @param identifier: Any string. 
89
        """
90
        #Used as a file name, so avoid spaces and exotic characters.
91
        self._process_transport = None
92
        self._child_process = None
93
        self._time_child_started = None
94
        self._child_running_time = None
95
        self.state = STATE_STOPPED
96
        self.command = command # string (bash)
97
        self.time_before_sigkill = 5.0 # seconds
98
        self.identifier = identifier # title
99
        self.env = {} # environment variables for the child process
100
        if env is not None:
101
            self.env.update(env)
102
        self.pid = None
103
        if self.identifier is None:
104
            self.identifier = "default"
105
        self.log_level = logging.DEBUG
106
        self._delayed_kill = None # DelayedCall instance
107
        
108
        self.state_changed_signal = sig.Signal()
109
        self.stdout_line_signal = sig.Signal()
110
        self.stderr_line_signal = sig.Signal()
111
    
112
    def _before_shutdown(self):
113
        """
114
        Called before twisted's reactor shutdown.
115
        to make sure that the process is dead before quitting.
116
        """
117
        if self.state in [STATE_STARTING, STATE_RUNNING, STATE_STOPPING]:
118
            msg = "Child still %s. Stopping it before shutdown." % (self.state)
119
            self.log(msg)
120
            self.stop()
121
    
122
    def is_alive(self):
123
        """
124
        Checks if the child is alive.
125
        """
126
        #TODO Use this
127
        if self.state == STATE_RUNNING:
128
            proc = self._process_transport
129
            try:
130
                proc.signalProcess(0)
131
            except (OSError, error.ProcessExitedAlready):
132
                msg = "Lost process %s. Error sending it an empty signal." % (self.identifier)
133
                print(msg)
134
                return False
135
            else:
136
                return True
137
        else:
138
            return False
139
    
140
    def start(self):
141
        """
142
        Starts the child process
143
        """
144
        if self.state in [STATE_RUNNING, STATE_STARTING]:
145
            msg = "Child is already %s. Cannot start it." % (self.state)
146
            raise ProcessError(msg)
147
        elif self.state == STATE_STOPPING:
148
            msg = "Child is %s. Please try again to start it when it will be stopped." % (self.state)
149
            raise ProcessError(msg)
150
        if self.command is None or self.command.strip() == "":
151
            msg = "You must provide a command to be run."
152
            raise ProcessError(msg)
153
        
154
        self.log("Will run command %s %s" % (self.identifier, str(self.command)))
155
        self._child_process = ProcessIO(self)
156
        environ = {}
157
        environ.update(os.environ)
158
        for key, val in self.env.iteritems():
159
            environ[key] = val
160
        self.set_child_state(STATE_STARTING)
161
        shell = "/bin/sh"
162
        if os.path.exists("/bin/bash"):
163
            shell = "/bin/bash"
164
        self._time_child_started = time.time()
165
        self._process_transport = reactor.spawnProcess(self._child_process, shell, [shell, "-c", "exec %s" % (self.command)], environ, usePTY=True)
166
        self.pid = self._process_transport.pid
167
        self.log("Spawned child %s with pid %s." % (self.identifier, self.pid))
168
    
169
    def _on_connection_made(self):
170
        if not STATE_STARTING:
171
            self.log("Connection made even if we were not starting the child process.", logging.ERROR)
172
        self.set_child_state(STATE_RUNNING)
173
    
174
    def stop(self):
175
        """
176
        Stops the child process
177
        """
178
        def _later_check(pid):
179
            if self.pid == pid:
180
                if self.state == STATE_STOPPING:
181
                    msg = "Child process %s not dead." % (self.identifier)
182
                    print msg
183
                    try:
184
                        self._process_transport.signalProcess(signal.SIGKILL)
185
                    except OSError, e:
186
                        msg = "Error sending signal %s to process %s. %s" % (signal_to_send, self.identifier, e)
187
                        print msg # raise?
188
                    except error.ProcessExitedAlready:
189
                        msg = "Process %s had already exited while trying to send signal %s." % (self.identifier, "SIGKILL")
190
                        print msg # raise ?
191
                elif self.state == STATE_STOPPED:
192
                    msg = "Successfully killed process after least than the %f seconds. State is %s." % (self.time_before_sigkill, self.state)
193
                    self.log(msg)
194
            self._delayed_kill = None
195
        
196
        # TODO: do callLater calls to check if the process is still running or not.
197
        #see twisted.internet.process._BaseProcess.reapProcess
198
        signal_to_send = None
199
        if self.state in [STATE_RUNNING, STATE_STARTING]:
200
            self.set_child_state(STATE_STOPPING)
201
            self.log('Will stop process using SIGTERM.')
202
            signal_to_send = signal.SIGTERM
203
        elif self.state == STATE_STOPPING:
204
            self.log('Trying to kill again the child process using SIGKILL.')
205
            signal_to_send = signal.SIGKILL
206
        else: # STOPPED
207
            msg = "Process is already stopped."
208
            self.set_child_state(STATE_STOPPED)
209
            print msg # raise?
210
        if signal_to_send is not None:
211
            try:
212
                self._process_transport.signalProcess(signal_to_send)
213
            except OSError, e:
214
                msg = "Error sending signal %s to process %s. %s" % (signal_to_send, self.identifier, e)
215
                print msg # raise?
216
            except error.ProcessExitedAlready:
217
                if signal_to_send == signal.SIGTERM:
218
                    msg = "Process %s had already exited while trying to send signal %s." % (self.identifier, signal_to_send)
219
                    print msg # raise ?
220
            else:
221
                if signal_to_send == signal.SIGTERM:
222
                    self._delayed_kill = reactor.callLater(self.time_before_sigkill, _later_check, self.pid)
223

    
224
    def log(self, msg, level=logging.DEBUG):
225
        """
226
        Logs to Master.
227
        (through stdout)
228
        """
229
        if level >= self.log_level:
230
            print "%9s process: %s" % (self.identifier, msg)
231

    
232
    def _on_process_ended(self, exit_code):
233
        self._child_running_time = time.time() - self._time_child_started
234
        if self.state == STATE_STOPPING:
235
            self.log('Child process exited as expected.')
236
            if self._delayed_kill is not None:
237
                if self._delayed_kill.active:
238
                    self._delayed_kill.cancel()
239
                self._delayed_kill = None
240
        elif self.state == STATE_STARTING:
241
            self.log('Child process exited while trying to start it.')
242
        elif self.state == STATE_RUNNING:
243
            if exit_code == 0:
244
                self.log('Child process exited.')
245
            else:
246
                self.log('Child process exited with error.')
247
        self._process_transport.loseConnection() # close file handles
248
        self.log("Child exitted with %s" % (exit_code), logging.INFO)
249
        self.set_child_state(STATE_STOPPED)
250
        
251
    def set_child_state(self, new_state):
252
        """
253
        Handles state changes.
254
        """
255
        if self.state != new_state:
256
            if new_state == STATE_STOPPED:
257
                self.log("Child lived for %s seconds." % (self._child_running_time))
258
                #self.io_protocol.send_state(new_state, self._child_running_time)
259
            elif self.state == STATE_STOPPED and new_state != STATE_STARTING:
260
                raise RuntimeError("Cannot go from STATE_STOPPED to %s " % (new_state))
261
            self.state_changed_signal(self, new_state)
262
            self.log("child state: %s" % (new_state))
263
        else:
264
            self.log("State is same as before: %s" % (new_state))
265
        self.state = new_state
266

    
267
    def __str__(self):
268
        return "%s %s" % (self.identifier, id(self))