fixes live info & filename url

main
Paul Schneider 7 years ago
parent 64832d6fef
commit e52f813ed2
2 changed files with 51 additions and 20 deletions

@ -247,14 +247,17 @@ public static FileRecievedInfo ReceiveProSignature(this ClaimsPrincipal user, st
public static string GetFileUrl (this LiveFlow flow) public static string GetFileUrl (this LiveFlow flow)
{ {
if (flow.DifferedFileName==null) if (flow.DifferedFileName==null) return null;
// no server-side backup for this stream // no server-side backup for this stream
return null; return $"{Startup.UserFilesOptions.RequestPath}/{flow.Owner.UserName}/live/"+GetFileName(flow);
}
public static string GetFileName (this LiveFlow flow)
{
var fileInfo = new FileInfo(flow.DifferedFileName); var fileInfo = new FileInfo(flow.DifferedFileName);
var ext = fileInfo.Extension; var ext = fileInfo.Extension;
var namelen = flow.DifferedFileName.Length - ext.Length; var namelen = flow.DifferedFileName.Length - ext.Length;
var basename = flow.DifferedFileName.Substring(0,namelen); var basename = flow.DifferedFileName.Substring(0,namelen);
return $"{Startup.UserFilesOptions.RequestPath}/{flow.Owner.UserName}/live/{basename}-{flow.SequenceNumber}{ext}"; return $"{basename}-{flow.SequenceNumber}{ext}";
} }
} }
} }

@ -15,6 +15,8 @@ using Yavsc.Helpers;
using Yavsc.Models; using Yavsc.Models;
using Yavsc.ViewModels.Streaming; using Yavsc.ViewModels.Streaming;
using Yavsc.Models.Messaging; using Yavsc.Models.Messaging;
using Yavsc.Models.FileSystem;
using Newtonsoft.Json;
namespace Yavsc.Services namespace Yavsc.Services
{ {
@ -53,9 +55,11 @@ namespace Yavsc.Services
LiveCastHandler meta = null; LiveCastHandler meta = null;
if (Casters.ContainsKey(uname)) if (Casters.ContainsKey(uname))
{ {
_logger.LogWarning($"Casters.ContainsKey({uname})");
meta = Casters[uname]; meta = Casters[uname];
if (meta.Socket.State == WebSocketState.Open || meta.Socket.State == WebSocketState.Connecting ) if (meta.Socket.State == WebSocketState.Open || meta.Socket.State == WebSocketState.Connecting )
{ {
_logger.LogWarning($"Closing cx");
// FIXME loosed connexion should be detected & disposed else where // FIXME loosed connexion should be detected & disposed else where
await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None); await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None);
@ -69,6 +73,7 @@ namespace Yavsc.Services
} }
else else
{ {
_logger.LogInformation($"new caster");
// Accept the socket // Accept the socket
meta = new LiveCastHandler { Socket = await context.WebSockets.AcceptWebSocketAsync() }; meta = new LiveCastHandler { Socket = await context.WebSockets.AcceptWebSocketAsync() };
} }
@ -90,13 +95,13 @@ namespace Yavsc.Services
WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, meta.TokenSource.Token); WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, meta.TokenSource.Token);
_logger.LogInformation($"Received bytes : {received.Count}"); _logger.LogInformation($"Received bytes : {received.Count}");
_logger.LogInformation($"Is the end : {received.EndOfMessage}"); _logger.LogInformation($"Is the end : {received.EndOfMessage}");
const string livePath = "live";
string destDir = context.User.InitPostToFileSystem("live"); string destDir = context.User.InitPostToFileSystem(livePath);
_logger.LogInformation($"Saving flow to {destDir}"); _logger.LogInformation($"Saving flow to {destDir}");
string fileName = Path.Combine(destDir, flow.DifferedFileName); string fileName = flow.GetFileName();
FileInfo iFile = new FileInfo(fileName); var fsInputQueue = new Queue<ArraySegment<byte>>();
var fsio = new Queue<ArraySegment<byte>>(); var taskWritingToFs = Task<FileRecievedInfo>.Run( ()=> user.ReceiveUserFile(destDir, fsInputQueue, fileName, flow.MediaType, meta.TokenSource.Token));
var toFs = Task.Run( ()=> user.ReceiveUserFile(destDir, fsio, flow.DifferedFileName, flow.MediaType, meta.TokenSource.Token));
var hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>(); var hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
hubContext.Clients.All.addPublicStream(new PublicStreamInfo hubContext.Clients.All.addPublicStream(new PublicStreamInfo
@ -130,7 +135,7 @@ namespace Yavsc.Services
ToClose.Push(cliItem.Key); ToClose.Push(cliItem.Key);
} }
} }
fsio.Enqueue(sBuffer); fsInputQueue.Enqueue(sBuffer);
// logger.LogInformation("replying..."); // logger.LogInformation("replying...");
while (!received.CloseStatus.HasValue) while (!received.CloseStatus.HasValue)
{ {
@ -155,7 +160,7 @@ namespace Yavsc.Services
ToClose.Push(cliItem.Key); ToClose.Push(cliItem.Key);
} }
} }
fsio.Enqueue(sBuffer); fsInputQueue.Enqueue(sBuffer);
_logger.LogInformation($"Received new bytes : {received.Count}"); _logger.LogInformation($"Received new bytes : {received.Count}");
_logger.LogInformation($"Is the end : {received.EndOfMessage}"); _logger.LogInformation($"Is the end : {received.EndOfMessage}");
while (ToClose.Count >0) while (ToClose.Count >0)
@ -163,26 +168,36 @@ namespace Yavsc.Services
string no = ToClose.Pop(); string no = ToClose.Pop();
_logger.LogInformation("Closing follower connection"); _logger.LogInformation("Closing follower connection");
WebSocket listenningSocket; WebSocket listenningSocket;
if (meta.Listeners.TryRemove(no, out listenningSocket)) if (meta.Listeners.TryRemove(no, out listenningSocket)) {
await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None); await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable,
"State != WebSocketState.Open", CancellationToken.None);
listenningSocket.Dispose();
}
} }
} }
_logger.LogInformation("Closing connection"); _logger.LogInformation("Closing connection");
await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None);
meta.Socket.Dispose();
meta.TokenSource.Cancel(); meta.TokenSource.Cancel();
Casters[uname] = null; taskWritingToFs.Wait();
_logger.LogInformation("Resulting file : " +JsonConvert.SerializeObject(taskWritingToFs.Result));
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"Exception occured : {ex.Message}"); _logger.LogError($"Exception occured : {ex.Message}");
_logger.LogError(ex.StackTrace); _logger.LogError(ex.StackTrace);
await meta.Socket.CloseAsync(received.CloseStatus.Value, "exception occured", CancellationToken.None);
meta.Socket.Dispose(); meta.Socket.Dispose();
Casters[uname] = null; meta.TokenSource.Cancel();
} }
taskWritingToFs.Dispose();
} }
else else
{ // not meta.Socket != null && meta.Socket.State == WebSocketState.Open {
// Socket was not accepted open ...
// not (meta.Socket != null && meta.Socket.State == WebSocketState.Open)
if (meta.Socket != null) if (meta.Socket != null)
{ {
_logger.LogError($"meta.Socket.State not Open: {meta.Socket.State.ToString()} "); _logger.LogError($"meta.Socket.State not Open: {meta.Socket.State.ToString()} ");
@ -191,6 +206,9 @@ namespace Yavsc.Services
else else
_logger.LogError("socket object is null"); _logger.LogError("socket object is null");
} }
RemoveLiveInfo(uname);
} }
catch (IOException ex) catch (IOException ex)
{ {
@ -201,12 +219,22 @@ namespace Yavsc.Services
else else
{ {
_logger.LogError($"Really unexpected end of stream"); _logger.LogError($"Really unexpected end of stream");
} await meta.Socket?.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, ex.Message, CancellationToken.None);
await meta.Socket?.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, ex.Message, CancellationToken.None); }
meta.Socket?.Dispose(); meta.Socket?.Dispose();
Casters[uname] = null;
RemoveLiveInfo(uname);
} }
return true; return true;
} }
void RemoveLiveInfo(string userName)
{
LiveCastHandler caster;
if (Casters.TryRemove(userName, out caster))
_logger.LogInformation("removed live info");
else
_logger.LogError("could not remove live info");
}
} }
} }
Loading…