|
|
@ -21,14 +21,15 @@ using Newtonsoft.Json;
|
|
|
|
namespace Yavsc.Services
|
|
|
|
namespace Yavsc.Services
|
|
|
|
{
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
public class LiveProcessor : ILiveProcessor {
|
|
|
|
public class LiveProcessor : ILiveProcessor
|
|
|
|
IHubContext _hubContext;
|
|
|
|
{
|
|
|
|
private ILogger _logger;
|
|
|
|
readonly IHubContext _hubContext;
|
|
|
|
ApplicationDbContext _dbContext;
|
|
|
|
private readonly ILogger _logger;
|
|
|
|
public PathString LiveCastingPath {get; set;} = Constants.LivePath;
|
|
|
|
readonly ApplicationDbContext _dbContext;
|
|
|
|
|
|
|
|
public PathString LiveCastingPath { get; set; } = Constants.LivePath;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public ConcurrentDictionary<string, LiveCastHandler> Casters {get;} = new ConcurrentDictionary<string, LiveCastHandler>();
|
|
|
|
public ConcurrentDictionary<string, LiveCastHandler> Casters { get; } = new ConcurrentDictionary<string, LiveCastHandler>();
|
|
|
|
|
|
|
|
|
|
|
|
public LiveProcessor(ApplicationDbContext dbContext, ILoggerFactory loggerFactory)
|
|
|
|
public LiveProcessor(ApplicationDbContext dbContext, ILoggerFactory loggerFactory)
|
|
|
|
{
|
|
|
|
{
|
|
|
@ -37,7 +38,7 @@ namespace Yavsc.Services
|
|
|
|
_logger = loggerFactory.CreateLogger<LiveProcessor>();
|
|
|
|
_logger = loggerFactory.CreateLogger<LiveProcessor>();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public async Task<bool> AcceptStream (HttpContext context)
|
|
|
|
public async Task<bool> AcceptStream(HttpContext context)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
// TODO defer request handling
|
|
|
|
// TODO defer request handling
|
|
|
|
var liveId = long.Parse(context.Request.Path.Value.Substring(LiveCastingPath.Value.Length + 1));
|
|
|
|
var liveId = long.Parse(context.Request.Path.Value.Substring(LiveCastingPath.Value.Length + 1));
|
|
|
@ -51,7 +52,7 @@ namespace Yavsc.Services
|
|
|
|
context.Response.StatusCode = 400;
|
|
|
|
context.Response.StatusCode = 400;
|
|
|
|
return false;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_logger.LogInformation("flow : "+flow.Title+" for "+uname);
|
|
|
|
_logger.LogInformation("flow : " + flow.Title + " for " + uname);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LiveCastHandler liveHandler = null;
|
|
|
|
LiveCastHandler liveHandler = null;
|
|
|
@ -59,14 +60,15 @@ namespace Yavsc.Services
|
|
|
|
{
|
|
|
|
{
|
|
|
|
_logger.LogWarning($"Casters.ContainsKey({uname})");
|
|
|
|
_logger.LogWarning($"Casters.ContainsKey({uname})");
|
|
|
|
liveHandler = Casters[uname];
|
|
|
|
liveHandler = Casters[uname];
|
|
|
|
if (liveHandler.Socket.State == WebSocketState.Open || liveHandler.Socket.State == WebSocketState.Connecting )
|
|
|
|
if (liveHandler.Socket.State == WebSocketState.Open || liveHandler.Socket.State == WebSocketState.Connecting)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
_logger.LogWarning($"Closing cx");
|
|
|
|
_logger.LogWarning($"Closing cx");
|
|
|
|
// FIXME loosed connexion should be detected & disposed else where
|
|
|
|
// FIXME loosed connexion should be detected & disposed else where
|
|
|
|
await liveHandler.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None);
|
|
|
|
await liveHandler.Socket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!liveHandler.TokenSource.IsCancellationRequested) {
|
|
|
|
if (!liveHandler.TokenSource.IsCancellationRequested)
|
|
|
|
|
|
|
|
{
|
|
|
|
liveHandler.TokenSource.Cancel();
|
|
|
|
liveHandler.TokenSource.Cancel();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
liveHandler.Socket.Dispose();
|
|
|
|
liveHandler.Socket.Dispose();
|
|
|
@ -90,14 +92,13 @@ namespace Yavsc.Services
|
|
|
|
// TODO: Handle the socket here.
|
|
|
|
// TODO: Handle the socket here.
|
|
|
|
// Find receivers: others in the chat room
|
|
|
|
// Find receivers: others in the chat room
|
|
|
|
// send them the flow
|
|
|
|
// send them the flow
|
|
|
|
var buffer = new byte[Constants.WebSocketsMaxBufLen+16];
|
|
|
|
var buffer = new byte[Constants.WebSocketsMaxBufLen];
|
|
|
|
var sBuffer = new ArraySegment<byte>(buffer);
|
|
|
|
var sBuffer = new ArraySegment<byte>(buffer);
|
|
|
|
_logger.LogInformation("Receiving bytes...");
|
|
|
|
_logger.LogInformation("Receiving bytes...");
|
|
|
|
|
|
|
|
|
|
|
|
WebSocketReceiveResult received = await liveHandler.Socket.ReceiveAsync(sBuffer, liveHandler.TokenSource.Token);
|
|
|
|
WebSocketReceiveResult received = await liveHandler.Socket.ReceiveAsync(sBuffer, liveHandler.TokenSource.Token);
|
|
|
|
int count = (received.Count<4)? 0 : buffer[0]*256*1024 +buffer[1]*1024+buffer[2]*256 + buffer[3];
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation($"Received bytes : {count}");
|
|
|
|
_logger.LogInformation($"Received bytes : {received.Count}");
|
|
|
|
_logger.LogInformation($"Is the end : {received.EndOfMessage}");
|
|
|
|
_logger.LogInformation($"Is the end : {received.EndOfMessage}");
|
|
|
|
const string livePath = "live";
|
|
|
|
const string livePath = "live";
|
|
|
|
|
|
|
|
|
|
|
@ -107,16 +108,17 @@ namespace Yavsc.Services
|
|
|
|
string fileName = flow.GetFileName();
|
|
|
|
string fileName = flow.GetFileName();
|
|
|
|
FileInfo destFileInfo = new FileInfo(Path.Combine(destDir, fileName));
|
|
|
|
FileInfo destFileInfo = new FileInfo(Path.Combine(destDir, fileName));
|
|
|
|
// this should end :-)
|
|
|
|
// this should end :-)
|
|
|
|
while (destFileInfo.Exists) {
|
|
|
|
while (destFileInfo.Exists)
|
|
|
|
|
|
|
|
{
|
|
|
|
flow.SequenceNumber++;
|
|
|
|
flow.SequenceNumber++;
|
|
|
|
fileName = flow.GetFileName();
|
|
|
|
fileName = flow.GetFileName();
|
|
|
|
destFileInfo = new FileInfo(Path.Combine(destDir, fileName));
|
|
|
|
destFileInfo = new FileInfo(Path.Combine(destDir, fileName));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
var fsInputQueue = new Queue<ArraySegment<byte>>();
|
|
|
|
var fsInputQueue = new Queue<ArraySegment<byte>>();
|
|
|
|
|
|
|
|
|
|
|
|
bool endOfInput=false;
|
|
|
|
bool endOfInput = false;
|
|
|
|
fsInputQueue.Enqueue(sBuffer);
|
|
|
|
fsInputQueue.Enqueue(sBuffer);
|
|
|
|
var taskWritingToFs = liveHandler.ReceiveUserFile(user, _logger, destDir, fsInputQueue, fileName, flow.MediaType, ()=> endOfInput);
|
|
|
|
var taskWritingToFs = liveHandler.ReceiveUserFile(user, _logger, destDir, fsInputQueue, fileName, flow.MediaType, () => endOfInput);
|
|
|
|
var hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
|
|
|
|
var hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
|
|
|
|
|
|
|
|
|
|
|
|
hubContext.Clients.All.addPublicStream(new PublicStreamInfo
|
|
|
|
hubContext.Clients.All.addPublicStream(new PublicStreamInfo
|
|
|
@ -132,15 +134,18 @@ namespace Yavsc.Services
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
try
|
|
|
|
{
|
|
|
|
{
|
|
|
|
do {
|
|
|
|
do
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}");
|
|
|
|
_logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}");
|
|
|
|
// Echo anything we receive
|
|
|
|
// Echo anything we receive
|
|
|
|
// and send to all listner found
|
|
|
|
// and send to all listner found
|
|
|
|
|
|
|
|
_logger.LogInformation($"{liveHandler.Listeners.Count} listeners");
|
|
|
|
foreach (var cliItem in liveHandler.Listeners)
|
|
|
|
foreach (var cliItem in liveHandler.Listeners)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
var listenningSocket = cliItem.Value;
|
|
|
|
var listenningSocket = cliItem.Value;
|
|
|
|
if (listenningSocket.State == WebSocketState.Open) {
|
|
|
|
if (listenningSocket.State == WebSocketState.Open)
|
|
|
|
|
|
|
|
{
|
|
|
|
_logger.LogInformation(cliItem.Key);
|
|
|
|
_logger.LogInformation(cliItem.Key);
|
|
|
|
await listenningSocket.SendAsync(
|
|
|
|
await listenningSocket.SendAsync(
|
|
|
|
sBuffer, received.MessageType, received.EndOfMessage, liveHandler.TokenSource.Token);
|
|
|
|
sBuffer, received.MessageType, received.EndOfMessage, liveHandler.TokenSource.Token);
|
|
|
@ -150,43 +155,47 @@ namespace Yavsc.Services
|
|
|
|
ToClose.Push(cliItem.Key);
|
|
|
|
ToClose.Push(cliItem.Key);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
buffer = new byte[Constants.WebSocketsMaxBufLen+16];
|
|
|
|
|
|
|
|
|
|
|
|
if (!received.CloseStatus.HasValue)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
_logger.LogInformation("try and receive new bytes");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
buffer = new byte[Constants.WebSocketsMaxBufLen];
|
|
|
|
sBuffer = new ArraySegment<byte>(buffer);
|
|
|
|
sBuffer = new ArraySegment<byte>(buffer);
|
|
|
|
received = await liveHandler.Socket.ReceiveAsync(sBuffer, liveHandler.TokenSource.Token);
|
|
|
|
received = await liveHandler.Socket.ReceiveAsync(sBuffer, liveHandler.TokenSource.Token);
|
|
|
|
count = (received.Count<4)? 0 : buffer[0]*256*1024 +buffer[1]*1024+buffer[2]*256 + buffer[3];
|
|
|
|
|
|
|
|
_logger.LogInformation($"Received bytes : {count}");
|
|
|
|
_logger.LogInformation($"Received bytes : {received.Count}");
|
|
|
|
_logger.LogInformation($"Is the end : {received.EndOfMessage}");
|
|
|
|
_logger.LogInformation($"Is the end : {received.EndOfMessage}");
|
|
|
|
if (received.Count<=4 || count > Constants.WebSocketsMaxBufLen) {
|
|
|
|
fsInputQueue.Enqueue(sBuffer);
|
|
|
|
if (received.CloseStatus.HasValue) {
|
|
|
|
if (received.CloseStatus.HasValue)
|
|
|
|
_logger.LogInformation($"received a close status: {received.CloseStatus.Value.ToString()}: {received.CloseStatusDescription}");
|
|
|
|
{
|
|
|
|
}
|
|
|
|
endOfInput=true;
|
|
|
|
else {
|
|
|
|
_logger.LogInformation($"received a close status: {received.CloseStatus.Value}: {received.CloseStatusDescription}");
|
|
|
|
_logger.LogError("Wrong packet size: "+count.ToString());
|
|
|
|
|
|
|
|
_logger.LogError(JsonConvert.SerializeObject(received));
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else fsInputQueue.Enqueue(sBuffer);
|
|
|
|
else endOfInput=true;
|
|
|
|
while (ToClose.Count >0)
|
|
|
|
while (ToClose.Count > 0)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
string no = ToClose.Pop();
|
|
|
|
string no = ToClose.Pop();
|
|
|
|
_logger.LogInformation("Closing follower connection");
|
|
|
|
_logger.LogInformation("Closing follower connection");
|
|
|
|
WebSocket listenningSocket;
|
|
|
|
WebSocket listenningSocket;
|
|
|
|
if (liveHandler.Listeners.TryRemove(no, out listenningSocket)) {
|
|
|
|
if (liveHandler.Listeners.TryRemove(no, out listenningSocket))
|
|
|
|
|
|
|
|
{
|
|
|
|
await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable,
|
|
|
|
await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable,
|
|
|
|
"State != WebSocketState.Open", CancellationToken.None);
|
|
|
|
"State != WebSocketState.Open", CancellationToken.None);
|
|
|
|
listenningSocket.Dispose();
|
|
|
|
listenningSocket.Dispose();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
while (!received.CloseStatus.HasValue);
|
|
|
|
while (liveHandler.Socket.State == WebSocketState.Open);
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation("Closing connection");
|
|
|
|
_logger.LogInformation("Closing connection");
|
|
|
|
endOfInput=true;
|
|
|
|
|
|
|
|
taskWritingToFs.Wait();
|
|
|
|
taskWritingToFs.Wait();
|
|
|
|
await liveHandler.Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, received.CloseStatusDescription,liveHandler.TokenSource.Token );
|
|
|
|
await liveHandler.Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, received.CloseStatusDescription, liveHandler.TokenSource.Token);
|
|
|
|
|
|
|
|
|
|
|
|
liveHandler.TokenSource.Cancel();
|
|
|
|
liveHandler.TokenSource.Cancel();
|
|
|
|
liveHandler.Dispose();
|
|
|
|
liveHandler.Dispose();
|
|
|
|
_logger.LogInformation("Resulting file : " +JsonConvert.SerializeObject(taskWritingToFs.Result));
|
|
|
|
_logger.LogInformation("Resulting file : " + JsonConvert.SerializeObject(taskWritingToFs.Result));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (Exception ex)
|
|
|
|
catch (Exception ex)
|
|
|
|
{
|
|
|
|
{
|
|
|
@ -203,7 +212,7 @@ namespace Yavsc.Services
|
|
|
|
// not (meta.Socket != null && meta.Socket.State == WebSocketState.Open)
|
|
|
|
// not (meta.Socket != null && meta.Socket.State == WebSocketState.Open)
|
|
|
|
if (liveHandler.Socket != null)
|
|
|
|
if (liveHandler.Socket != null)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
_logger.LogError($"meta.Socket.State not Open: {liveHandler.Socket.State.ToString()} ");
|
|
|
|
_logger.LogError($"meta.Socket.State not Open: {liveHandler.Socket.State} ");
|
|
|
|
liveHandler.Socket.Dispose();
|
|
|
|
liveHandler.Socket.Dispose();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
else
|
|
|
|