From 5d378da97a58d41e7955bf2c75a4dfc12efd3540 Mon Sep 17 00:00:00 2001 From: Paul Schneider Date: Wed, 28 Nov 2018 13:56:35 +0000 Subject: [PATCH] initiating a live casting --- .../Communicating/LiveController.cs | 83 ++++++++++++++++ Yavsc/Startup/Startup.WebSockets.cs | 96 ------------------- Yavsc/ViewModels/Streaming/LiveCastMeta.cs | 16 ++++ 3 files changed, 99 insertions(+), 96 deletions(-) create mode 100644 Yavsc/Controllers/Communicating/LiveController.cs create mode 100644 Yavsc/ViewModels/Streaming/LiveCastMeta.cs diff --git a/Yavsc/Controllers/Communicating/LiveController.cs b/Yavsc/Controllers/Communicating/LiveController.cs new file mode 100644 index 00000000..bd1e7300 --- /dev/null +++ b/Yavsc/Controllers/Communicating/LiveController.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Security.Claims; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNet.Mvc; +using Microsoft.Extensions.Logging; +using Yavsc.ViewModels.Streaming; + +namespace Yavsc.Controllers.Communicating +{ + public class LiveController : Controller + { + ILogger _logger; + public static ConcurrentDictionary Casters = new ConcurrentDictionary(); + public LiveController(LoggerFactory loggerFactory) + { + _logger = loggerFactory.CreateLogger(); + } + + public async Task Cast() + { + var uname = User.GetUserName(); + // ensure this request is for a websocket + if (!HttpContext.WebSockets.IsWebSocketRequest) return new BadRequestResult(); + // ensure uniqueness of casting stream from this user + var existent = Casters[uname]; + if (existent != null) return new BadRequestObjectResult("not supported, you already casting, there's support for one live streaming only"); + var meta = new LiveCastMeta { Socket = await HttpContext.WebSockets.AcceptWebSocketAsync() }; + + using (meta.Socket) + { + if (meta.Socket != null && meta.Socket.State == WebSocketState.Open) + { + Casters[uname] = meta; + // TODO: Handle the socket here. + // Find receivers: others in the chat room + // send them the flow + + byte[] buffer = new byte[1024]; + WebSocketReceiveResult received = await meta.Socket.ReceiveAsync + (new ArraySegment(buffer), CancellationToken.None); + + // FIXME do we really need to close those one in invalid state ? + Stack ToClose = new Stack(); + + while (received.MessageType != WebSocketMessageType.Close) + { + _logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}"); + // Echo anything we receive + // and send to all listner found + foreach (var cliItem in meta.Listeners) + { + var listenningSocket = cliItem.Value; + if (listenningSocket.State == WebSocketState.Open) + await listenningSocket.SendAsync(new ArraySegment + (buffer, 0, received.Count), received.MessageType, received.EndOfMessage, CancellationToken.None); + else ToClose.Push(cliItem.Key); + } + received = await meta.Socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); + + string no; + do + { + no = ToClose.Pop(); + WebSocket listenningSocket; + if (meta.Listeners.TryRemove(no, out listenningSocket)) + await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None); + + } while (no != null); + } + await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); + Casters[uname] = null; + } + else _logger.LogInformation($"failed (meta.Socket != null && meta.Socket.State == WebSocketState.Open)"); + } + + return Ok(); + } + } +} \ No newline at end of file diff --git a/Yavsc/Startup/Startup.WebSockets.cs b/Yavsc/Startup/Startup.WebSockets.cs index ecd72761..0c5d746f 100644 --- a/Yavsc/Startup/Startup.WebSockets.cs +++ b/Yavsc/Startup/Startup.WebSockets.cs @@ -1,111 +1,15 @@ -using System; -using System.Collections.Concurrent; -using System.Net.WebSockets; -using System.Threading; using Microsoft.AspNet.Builder; using Microsoft.AspNet.Hosting; -using Yavsc.Extensions; namespace Yavsc { public partial class Startup { - public static ConcurrentBag Listeners = new ConcurrentBag(); public void ConfigureWebSocketsApp(IApplicationBuilder app, SiteSettings siteSettings, IHostingEnvironment env) { app.UseWebSockets(); app.UseSignalR("/api/signalr"); - app.UseWhen(context => context.Request.Path.StartsWithSegments("/ws"), - branch => - { - branch.Use( - async (http, next) => - { - if (http.WebSockets.IsWebSocketRequest) - { - WebSocket webSocket = null; - if (!Listeners.TryPeek(out webSocket)) - { - webSocket = await http.WebSockets.AcceptWebSocketAsync(); - Listeners.Add(webSocket); - } - using (webSocket) - { - if (webSocket != null && webSocket.State == WebSocketState.Open) - { - // TODO: Handle the socket here. - // Find receivers: others in the chat room - // send them the flow - - byte[] buffer = new byte[1024]; - WebSocketReceiveResult received = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - while (received.MessageType != WebSocketMessageType.Close) - { - Console.WriteLine($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}"); - // Echo anything we receive - await webSocket.SendAsync(new ArraySegment(buffer, 0, received.Count), received.MessageType, received.EndOfMessage, CancellationToken.None); - - received = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - } - await webSocket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); - - } - - } - } - else - { - // Nothing to do here, pass downstream. - await next(); - } - } - ); - } - ); - -/* - var _sockets = new ConcurrentBag(); - - app.Use( - async (http, next) => - { - if (http.WebSockets.IsWebSocketRequest) - { - WebSocket webSocket = null; - if (!_sockets.TryPeek(out webSocket)) - { - webSocket = await http.WebSockets.AcceptWebSocketAsync(); - _sockets.Add(webSocket); - } - using (webSocket) - { - if (webSocket != null && webSocket.State == WebSocketState.Open) - { - // TODO: Handle the socket here. - byte[] buffer = new byte[1024]; - WebSocketReceiveResult received = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - while (received.MessageType != WebSocketMessageType.Close) - { - Console.WriteLine($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}"); - // Echo anything we receive - await webSocket.SendAsync(new ArraySegment(buffer, 0, received.Count), received.MessageType, received.EndOfMessage, CancellationToken.None); - - received = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - } - await webSocket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); - - } - - } - } - else - { - // Nothing to do here, pass downstream. - await next(); - } - } - ); */ } } } \ No newline at end of file diff --git a/Yavsc/ViewModels/Streaming/LiveCastMeta.cs b/Yavsc/ViewModels/Streaming/LiveCastMeta.cs new file mode 100644 index 00000000..4ec0cd6a --- /dev/null +++ b/Yavsc/ViewModels/Streaming/LiveCastMeta.cs @@ -0,0 +1,16 @@ +using System.Collections.Concurrent; +using System.Net.WebSockets; + +namespace Yavsc.ViewModels.Streaming +{ + public class LiveCastClient { + public string UserName { get; set; } + public WebSocket Socket { get; set; } + } + + public class LiveCastMeta + { + public WebSocket Socket { get; set; } + public ConcurrentDictionary Listeners { get; set; } = new ConcurrentDictionary(); + } +} \ No newline at end of file