This URL has Read-Only access.

Statistics
| Branch: | Tag: | Revision:

root / py / scenic / streamer.py @ 15818c72

History | View | Annotate | Download (14.2 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
# 
4
# Scenic
5
# Copyright (C) 2008 Société des arts technologiques (SAT)
6
# http://www.sat.qc.ca
7
# All rights reserved.
8
#
9
# This file is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU General Public License as published by
11
# the Free Software Foundation, either version 2 of the License, or
12
# (at your option) any later version.
13
#
14
# Scenic is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU General Public License for more details.
18
#
19
# You should have received a copy of the GNU General Public License
20
# along with Scenic. If not, see <http://www.gnu.org/licenses/>.
21

    
22
"""
23
Manages local streamer processes.
24
"""
25

    
26
from scenic import process
27
from scenic import sig
28

    
29
class StreamerManager(object):
30
    """
31
    Manages local streamer processes.
32
    """
33
    def __init__(self, app):
34
        self.app = app
35
        # commands
36
        self.milhouse_recv_cmd = None
37
        self.milhouse_send_cmd = None
38
        self.sender = None
39
        self.receiver = None
40
        self.state = process.STATE_STOPPED
41
        self.state_changed_signal = sig.Signal()
42
        # for stats
43
        self.session_details = None # either None or a big dict
44
        self.rtcp_stats = None # either None or a big dict
45

    
46
    def _gather_config_to_stream(self, addr):
47
        """
48
        Gathers all settings in a big dict.
49
        
50
        Useful for feedback to the user.
51
        """
52
        contact_name = addr
53
        contact = self.app._get_contact_by_addr(addr)
54
        if contact is not None:
55
            contact_name = contact["name"]
56
        
57
        remote_config = self.app.remote_config
58
        send_width, send_height = self.app.config.video_capture_size.split("x")
59
        receive_width, receive_height = remote_config["video"]["capture_size"].split("x")
60
        self.session_details = {
61
            "peer": {
62
                "address": addr,
63
                "name": contact_name,
64
                },
65
            # ----------------- send ---------------
66
            "send": {
67
                "video": {
68
                    "source": self.app.config.video_source,
69
                    "device": self.app.config.video_device,
70
                    "bitrate": self.app.config.video_bitrate,
71
                    "codec": remote_config["video"]["codec"],
72
                    "width": int(send_width), # int
73
                    "height": int(send_height), # int
74
                    "aspect-ratio": self.app.config.video_aspect_ratio,
75
                    "port": self.app.remote_config["video"]["port"], 
76
                },
77
                "audio": {
78
                    "source": self.app.config.audio_source,
79
                    "numchannels": self.app.config.audio_channels,
80
                    "codec": self.app.config.audio_codec,
81
                    "port": self.app.remote_config["audio"]["port"], 
82
                }
83
            },
84
            # -------------------- recv ------------
85
            "receive": {
86
                "video": {
87
                    "sink": self.app.config.video_sink,
88
                    "codec": self.app.config.video_codec,
89
                    "width": int(receive_width), # int
90
                    "height": int(receive_height), # int
91
                    "deinterlace": self.app.config.video_deinterlace, # bool
92
                    "aspect-ratio": remote_config["video"]["aspect_ratio"],
93
                    "port": str(self.app.recv_video_port), #decided by the app
94
                    "window-title": "\"From %s\"" % (contact_name), #TODO: i18n
95
                    "jitterbuffer": self.app.config.video_jitterbuffer, 
96
                    "fullscreen": self.app.config.video_fullscreen, # bool
97
                    "bitrate": remote_config["video"]["bitrate"], # float
98
                },
99
                "audio": {
100
                    "numchannels": self.app.config.audio_channels, # int
101
                    "codec": self.app.config.audio_codec, 
102
                    "port": self.app.recv_audio_port,
103
                    "sink": self.app.config.audio_sink
104
                }
105
            }
106
        }
107
        if self.session_details["send"]["video"]["source"] != "v4l2src":
108
            self.session_details["send"]["video"]["device"] = None
109
        if self.session_details["send"]["video"]["codec"] == "theora":
110
            self.session_details["send"]["video"]["bitrate"] = None
111
        print(str(self.session_details))
112
        
113
    def start(self, host):
114
        """
115
        Starts the sender and receiver processes.
116
        
117
        @param host: str ip addr
118
        Raises a RuntimeError if a sesison is already in progress.
119
        """
120
        if self.state != process.STATE_STOPPED:
121
            raise RuntimeError("Cannot start streamers since they are %s." % (self.state)) # the programmer has done something wrong if we're here.
122
        
123
        self._gather_config_to_stream(host)
124
        details = self.session_details
125

    
126
        # ------------------ send ---------------
127
        self.milhouse_send_cmd = [
128
            "milhouse", 
129
            '--sender', 
130
            '--address', details["peer"]["address"],
131
            '--videosource', details["send"]["video"]["source"],
132
            '--videocodec', details["send"]["video"]["codec"],
133
            '--videoport', str(details["send"]["video"]["port"]),
134
            '--width', str(details["send"]["video"]["width"]),
135
            '--height', str(details["send"]["video"]["height"]),
136
            '--aspect-ratio', str(details["send"]["video"]["aspect-ratio"]),
137
            '--audiosource', details["send"]["audio"]["source"],
138
            '--numchannels', str(details["send"]["audio"]["numchannels"]),
139
            '--audiocodec', details["send"]["audio"]["codec"],
140
            '--audioport', str(details["send"]["audio"]["port"]),
141
            ]
142
        if details["send"]["video"]["source"] == "v4l2src":
143
            self.milhouse_send_cmd.extend(["--videodevice", details["send"]["video"]["device"]])
144
        if details["send"]["video"]["codec"] != "theora":
145
            self.milhouse_send_cmd.extend(['--videobitrate', str(int(details["send"]["video"]["bitrate"] * 1000000))])
146

    
147
        # ------------------- recv ----------------
148
        self.milhouse_recv_cmd = [
149
            "milhouse",
150
            '--receiver', 
151
            '--address', details["peer"]["address"],
152
            '--videosink', details["receive"]["video"]["sink"],
153
            '--videocodec', details["receive"]["video"]["codec"],
154
            '--videoport', str(details["receive"]["video"]["port"]),
155
            '--jitterbuffer', str(details["receive"]["video"]["jitterbuffer"]),
156
            '--width', str(details["receive"]["video"]["width"]),
157
            '--height', str(details["receive"]["video"]["height"]),
158
            '--aspect-ratio', details["receive"]["video"]["aspect-ratio"],
159
            '--audiosink', details["receive"]["audio"]["sink"],
160
            '--numchannels', str(details["receive"]["audio"]["numchannels"]),
161
            '--audiocodec', details["receive"]["audio"]["codec"],
162
            '--audioport', str(details["receive"]["audio"]["port"]),
163
            '--window-title', details["receive"]["video"]["window-title"],
164
            ]
165
        if details["receive"]["video"]["fullscreen"]:
166
            self.milhouse_recv_cmd.append('--fullscreen')
167
        if details["receive"]["video"]["deinterlace"]:
168
            self.milhouse_recv_cmd.append('--deinterlace')
169

    
170
        # setting up
171
        self.rtcp_stats = {
172
            "send": {
173
                "video": {
174
                    "packets-lost": 0,
175
                    "packets-sent": 0,
176
                    "jitter": 0,
177
                    "bitrate": 0,
178
                    "connected": False
179
                },
180
                "audio": {
181
                    "packets-lost": 0,
182
                    "packets-sent": 0,
183
                    "jitter": 0,
184
                    "bitrate": 0,
185
                    "connected": False
186
                }
187
            },
188
            "receive": {
189
                "video": {
190
                    "connected": False
191
                },
192
                "audio": {
193
                    "connected": False
194
                }
195
            }
196
        }
197

    
198
        recv_cmd = " ".join(self.milhouse_recv_cmd)
199
        self.receiver = process.ProcessManager(command=recv_cmd, identifier="receiver")
200
        self.receiver.state_changed_signal.connect(self.on_process_state_changed)
201
        self.receiver.stdout_line_signal.connect(self.on_receiver_stdout_line)
202
        self.receiver.stderr_line_signal.connect(self.on_receiver_stderr_line)
203
        send_cmd = " ".join(self.milhouse_send_cmd)
204
        self.sender = process.ProcessManager(command=send_cmd, identifier="sender")
205
        self.sender.state_changed_signal.connect(self.on_process_state_changed)
206
        self.sender.stdout_line_signal.connect(self.on_sender_stdout_line)
207
        self.sender.stderr_line_signal.connect(self.on_sender_stderr_line)
208
        # starting
209
        self._set_state(process.STATE_STARTING)
210
        print "$", send_cmd
211
        self.sender.start()
212
        print "$", recv_cmd
213
        self.receiver.start()
214

    
215
    def on_receiver_stdout_line(self, line):
216
        """
217
        Handles a new line from our receiver process' stdout
218
        """
219
        if "stream connected" in line:
220
            if "audio" in line:
221
                self.rtcp_stats["receive"]["audio"]["connected"] = True
222
            elif "video" in line:
223
                self.rtcp_stats["receive"]["video"]["connected"] = True
224
        else:
225
            print "%9s stdout: %s" % (self.receiver.identifier, line)
226

    
227
    def on_receiver_stderr_line(self, line):
228
        """
229
        Handles a new line from our receiver process' stderr
230
        """
231
        print "%9s stdout: %s" % (self.receiver.identifier, line)
232
    
233
    def on_sender_stdout_line(self, line):
234
        """
235
        Handles a new line from our receiver process' stdout
236
        """
237
        def _line_contains_a_video_codec(line):
238
            return "mpeg4" in line or "theora" in line or "h263" in line or "h264" in line
239
        def _line_contains_an_audio_codec(line):
240
            return "raw" in line or "mp3" in line or "vorbis" in line
241
            
242
        print "%9s stdout: %s" % (self.sender.identifier, line)
243
        if "PACKETS-LOST" in line:
244
            if _line_contains_a_video_codec(line):
245
                self.rtcp_stats["send"]["video"]["packets-lost"] = int(line.split(":")[-1])
246
            elif _line_contains_an_audio_codec(line):
247
                self.rtcp_stats["send"]["audio"]["packets-lost"] = int(line.split(":")[-1])
248
        if "PACKETS-SENT" in line:
249
            if _line_contains_a_video_codec(line):
250
                self.rtcp_stats["send"]["video"]["packets-sent"] = int(line.split(":")[-1])
251
            elif _line_contains_an_audio_codec(line):
252
                self.rtcp_stats["send"]["audio"]["packets-sent"] = int(line.split(":")[-1])
253
        elif "JITTER" in line:
254
            if _line_contains_a_video_codec(line):
255
                self.rtcp_stats["send"]["video"]["jitter"] = int(line.split(":")[-1])
256
            elif _line_contains_an_audio_codec(line):
257
                self.rtcp_stats["send"]["audio"]["jitter"] = int(line.split(":")[-1])
258
        elif "connected" in line:
259
            if _line_contains_a_video_codec(line):
260
                self.rtcp_stats["send"]["video"]["connected"] = True
261
            elif _line_contains_an_audio_codec(line):
262
                self.rtcp_stats["send"]["audio"]["connected"] = True
263

    
264
    def on_sender_stderr_line(self, line):
265
        """
266
        Handles a new line from our receiver process' stderr
267
        """
268
        print "%9s stderr: %s" % (self.sender.identifier, line)
269

    
270
    def is_busy(self):
271
        """
272
        Retuns True if a streaming session is in progress.
273
        """
274
        return self.state != process.STATE_STOPPED
275

    
276
    def on_process_state_changed(self, process_manager, process_state):
277
        """
278
        Slot for the ProcessManager.state_changed_signal
279
        Calls stop() if one of the processes crashed.
280
        """
281
        print process_manager, process_state
282
        if process_state == process.STATE_RUNNING:
283
            # As soon as one is running, set our state to running
284
            if self.state == process.STATE_STARTING:
285
                self._set_state(process.STATE_RUNNING)
286
        elif process_state == process.STATE_STOPPING:
287
            pass
288
        elif process_state == process.STATE_STARTING:
289
            pass
290
        elif process_state == process.STATE_STOPPED:
291
            # As soon as one crashes or is not able to start, stop all streamer processes.
292
            if self.state in [process.STATE_RUNNING, process.STATE_STARTING]:
293
                print("A streamer process died. Stopping the local streamer manager.")
294
                self.stop() # sets self.state to STOPPING
295
            # Next, if all streamers are dead, we can say this manager is stopped
296
            if self.state == process.STATE_STOPPING:
297
                one_is_left = False
298
                for proc in [self.sender, self.receiver]:
299
                    if process_manager is not proc and proc.state != process.STATE_STOPPED:
300
                        print("Streamer process %s is not dead, so we are not done stopping. Its state is %s." % (proc, proc.state))
301
                        one_is_left = True
302
                if not one_is_left:
303
                    print "Setting streamers manager to STOPPED"
304
                    self._set_state(process.STATE_STOPPED)
305
    
306
    def _set_state(self, new_state):
307
        """
308
        Handles state changes.
309
        """
310
        if self.state != new_state:
311
            self.state_changed_signal(self, new_state)
312
            self.state = new_state
313
        else:
314
            raise RuntimeError("Setting state to %s, which is already the current state." % (self.state))
315
            
316
    def stop(self):
317
        """
318
        Stops the sender and receiver processes.
319
        Does not send any message to the remote peer ! This must be done somewhere else.
320
        """
321
        #TODO: return a deferred
322
        # stopping
323
        if self.state in [process.STATE_RUNNING, process.STATE_STARTING]:
324
            self._set_state(process.STATE_STOPPING)
325
            for proc in [self.sender, self.receiver]:
326
                if proc is not None:
327
                    if proc.state != process.STATE_STOPPED and proc.state != process.STATE_STOPPING:
328
                        proc.stop()