initiating a live casting

vnext
Paul Schneider 6 years ago
parent c00ba49281
commit 5d378da97a
3 changed files with 99 additions and 96 deletions

@ -0,0 +1,83 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Mvc;
using Microsoft.Extensions.Logging;
using Yavsc.ViewModels.Streaming;
namespace Yavsc.Controllers.Communicating
{
public class LiveController : Controller
{
ILogger _logger;
public static ConcurrentDictionary<string, LiveCastMeta> Casters = new ConcurrentDictionary<string, LiveCastMeta>();
public LiveController(LoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<LiveController>();
}
public async Task<IActionResult> Cast()
{
var uname = User.GetUserName();
// ensure this request is for a websocket
if (!HttpContext.WebSockets.IsWebSocketRequest) return new BadRequestResult();
// ensure uniqueness of casting stream from this user
var existent = Casters[uname];
if (existent != null) return new BadRequestObjectResult("not supported, you already casting, there's support for one live streaming only");
var meta = new LiveCastMeta { Socket = await HttpContext.WebSockets.AcceptWebSocketAsync() };
using (meta.Socket)
{
if (meta.Socket != null && meta.Socket.State == WebSocketState.Open)
{
Casters[uname] = meta;
// TODO: Handle the socket here.
// Find receivers: others in the chat room
// send them the flow
byte[] buffer = new byte[1024];
WebSocketReceiveResult received = await meta.Socket.ReceiveAsync
(new ArraySegment<byte>(buffer), CancellationToken.None);
// FIXME do we really need to close those one in invalid state ?
Stack<string> ToClose = new Stack<string>();
while (received.MessageType != WebSocketMessageType.Close)
{
_logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}");
// Echo anything we receive
// and send to all listner found
foreach (var cliItem in meta.Listeners)
{
var listenningSocket = cliItem.Value;
if (listenningSocket.State == WebSocketState.Open)
await listenningSocket.SendAsync(new ArraySegment<byte>
(buffer, 0, received.Count), received.MessageType, received.EndOfMessage, CancellationToken.None);
else ToClose.Push(cliItem.Key);
}
received = await meta.Socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
string no;
do
{
no = ToClose.Pop();
WebSocket listenningSocket;
if (meta.Listeners.TryRemove(no, out listenningSocket))
await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None);
} while (no != null);
}
await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None);
Casters[uname] = null;
}
else _logger.LogInformation($"failed (meta.Socket != null && meta.Socket.State == WebSocketState.Open)");
}
return Ok();
}
}
}

@ -1,111 +1,15 @@
using System;
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Threading;
using Microsoft.AspNet.Builder;
using Microsoft.AspNet.Hosting;
using Yavsc.Extensions;
namespace Yavsc
{
public partial class Startup
{
public static ConcurrentBag<WebSocket> Listeners = new ConcurrentBag<WebSocket>();
public void ConfigureWebSocketsApp(IApplicationBuilder app,
SiteSettings siteSettings, IHostingEnvironment env)
{
app.UseWebSockets();
app.UseSignalR("/api/signalr");
app.UseWhen(context => context.Request.Path.StartsWithSegments("/ws"),
branch =>
{
branch.Use(
async (http, next) =>
{
if (http.WebSockets.IsWebSocketRequest)
{
WebSocket webSocket = null;
if (!Listeners.TryPeek(out webSocket))
{
webSocket = await http.WebSockets.AcceptWebSocketAsync();
Listeners.Add(webSocket);
}
using (webSocket)
{
if (webSocket != null && webSocket.State == WebSocketState.Open)
{
// TODO: Handle the socket here.
// Find receivers: others in the chat room
// send them the flow
byte[] buffer = new byte[1024];
WebSocketReceiveResult received = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (received.MessageType != WebSocketMessageType.Close)
{
Console.WriteLine($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}");
// Echo anything we receive
await webSocket.SendAsync(new ArraySegment<byte>(buffer, 0, received.Count), received.MessageType, received.EndOfMessage, CancellationToken.None);
received = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
await webSocket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None);
}
}
}
else
{
// Nothing to do here, pass downstream.
await next();
}
}
);
}
);
/*
var _sockets = new ConcurrentBag<WebSocket>();
app.Use(
async (http, next) =>
{
if (http.WebSockets.IsWebSocketRequest)
{
WebSocket webSocket = null;
if (!_sockets.TryPeek(out webSocket))
{
webSocket = await http.WebSockets.AcceptWebSocketAsync();
_sockets.Add(webSocket);
}
using (webSocket)
{
if (webSocket != null && webSocket.State == WebSocketState.Open)
{
// TODO: Handle the socket here.
byte[] buffer = new byte[1024];
WebSocketReceiveResult received = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (received.MessageType != WebSocketMessageType.Close)
{
Console.WriteLine($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}");
// Echo anything we receive
await webSocket.SendAsync(new ArraySegment<byte>(buffer, 0, received.Count), received.MessageType, received.EndOfMessage, CancellationToken.None);
received = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
await webSocket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None);
}
}
}
else
{
// Nothing to do here, pass downstream.
await next();
}
}
); */
}
}
}

@ -0,0 +1,16 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
namespace Yavsc.ViewModels.Streaming
{
public class LiveCastClient {
public string UserName { get; set; }
public WebSocket Socket { get; set; }
}
public class LiveCastMeta
{
public WebSocket Socket { get; set; }
public ConcurrentDictionary<string, WebSocket> Listeners { get; set; } = new ConcurrentDictionary<string, WebSocket>();
}
}
Loading…