Rework stream

This commit is contained in:
danielgrabowski 2019-11-08 16:36:20 +01:00
parent 15112abac9
commit 6f1cc96c16
7 changed files with 52 additions and 27 deletions

View File

@ -14,7 +14,7 @@ namespace Squirrowse.Client.Service
private readonly IConnectionManager connectionManager; private readonly IConnectionManager connectionManager;
private readonly ILogger<ActionDispatcher> logger; private readonly ILogger<ActionDispatcher> logger;
private readonly HubConnection session; private readonly HubConnection session;
private bool streamOn = false;
public ActionDispatcher(ILogger<ActionDispatcher> logger, IConnectionManager connectionManager, public ActionDispatcher(ILogger<ActionDispatcher> logger, IConnectionManager connectionManager,
ICameraService camera) ICameraService camera)
{ {
@ -23,12 +23,14 @@ namespace Squirrowse.Client.Service
this.camera = camera; this.camera = camera;
session = connectionManager.GetConnection().Result; session = connectionManager.GetConnection().Result;
session.On("Start", StartStream); session.On("Start", StartStream);
//session.On("StartEnum", StartStreamEnumerable);
session.On("Stop", StopStream); session.On("Stop", StopStream);
} }
public Task StopStream() public Task StopStream()
{ {
throw new NotImplementedException(); streamOn = false;
return Task.CompletedTask;
} }
public async Task SayHello() public async Task SayHello()
@ -39,19 +41,19 @@ namespace Squirrowse.Client.Service
public async Task SendStreamAsync(IAsyncEnumerable<byte[]> asb) public async Task SendStreamAsync(IAsyncEnumerable<byte[]> asb)
{ {
logger.LogInformation($"{nameof(SendStreamAsync)} Start stream"); logger.LogInformation($"{nameof(SendStreamAsync)} Start stream");
await session.SendAsync("UploadByteStream", asb); await session.SendAsync("UploadByteStream", asb);
logger.LogInformation($"{nameof(SendStreamAsync)} End stream"); logger.LogInformation($"{nameof(SendStreamAsync)} End stream");
} }
public async Task StartStream() public async Task StartStream()
{ {
while (true) streamOn = true;
{
await SendStreamAsync(camera.GetFramesAsyncEnumerator()); await SendStreamAsync(camera.GetFramesAsyncEnumerator());
} }
} }
} }
}

View File

@ -22,9 +22,14 @@ namespace Squirrowse.Client.Service
} }
public async IAsyncEnumerable<byte[]> GetFramesAsyncEnumerator() public async IAsyncEnumerable<byte[]> GetFramesAsyncEnumerator()
{
while (true)
{ {
using var fr = await GetFrame(); using var fr = await GetFrame();
yield return fr.ConvertToJpgByte(); yield return fr.ConvertToJpgByte();
await Task.Delay(1000 / 30);
}
//fr.Dispose(); //fr.Dispose();
} }
} }

View File

@ -11,6 +11,7 @@ namespace Squirrowse.Service.Hubs
Task RemoveUserByUserName(string agentName); Task RemoveUserByUserName(string agentName);
IEnumerable<User> getServerSideUsers(); IEnumerable<User> getServerSideUsers();
bool CheckUser(string agentName); bool CheckUser(string agentName);
bool StreamOn();
IEnumerable<User> getClientSideUsers(); IEnumerable<User> getClientSideUsers();
IEnumerable<User> getAllUsers(); IEnumerable<User> getAllUsers();
} }

View File

@ -13,6 +13,7 @@ namespace Squirrowse.Service.Hubs
private readonly ILogger<StreamHub> logger; private readonly ILogger<StreamHub> logger;
private readonly IStreamManager manager; private readonly IStreamManager manager;
public StreamHub(ILogger<StreamHub> logger, IStreamManager manager) public StreamHub(ILogger<StreamHub> logger, IStreamManager manager)
{ {
this.logger = logger; this.logger = logger;
@ -62,6 +63,7 @@ namespace Squirrowse.Service.Hubs
public async Task Startstream(string clientId) public async Task Startstream(string clientId)
{ {
manager.StreamOn() = true;
//var client = Clients.Client(clientId); //var client = Clients.Client(clientId);
await Clients.Groups(Core.Models.Groups.normal.ToString()).SendAsync("Start"); await Clients.Groups(Core.Models.Groups.normal.ToString()).SendAsync("Start");
// await client.SendAsync("Start"); // await client.SendAsync("Start");
@ -70,20 +72,22 @@ namespace Squirrowse.Service.Hubs
public async Task StopStream(string clientId) public async Task StopStream(string clientId)
{ {
var client = Clients.Client(clientId); streamOn = false;
//var client = Clients.Client(clientId);
await client.SendAsync("Stop"); // await Clients.Groups(Core.Models.Groups.normal.ToString()).SendAsync("Stop");
//await client.SendAsync("Stop");
} }
public async Task UploadByteStream(IAsyncEnumerable<byte[]> stream) public async Task UploadByteStream(IAsyncEnumerable<byte[]> stream)
{ {
//await Clients.Groups(Core.Models.Groups.superUser.ToString()).SendAsync("DUPA", "beniz");
//await Clients.Groups(Core.Models.Groups.superUser.ToString()).SendAsync("RecData", stream);
await foreach (var frame in stream) await foreach (var frame in stream)
{ {
await Clients.Groups(Core.Models.Groups.superUser.ToString()).SendAsync("RecData", frame);
logger.LogInformation($"Got frame size: {frame.Length} "); logger.LogInformation($"Got frame size: {frame.Length} ");
await Task.Delay(100); //leave some delay for debug purpose if (!streamOn) continue;
logger.LogInformation($"Send frame of size: {frame.Length} to {Core.Models.Groups.superUser.ToString()}");
await Clients.Groups(Core.Models.Groups.superUser.ToString()).SendAsync("RecData", frame);
//await Task.Delay(100); //leave some delay for debug purpose
} }
} }

View File

@ -8,7 +8,11 @@ namespace Squirrowse.Service.Hubs
public class StreamManager : IStreamManager public class StreamManager : IStreamManager
{ {
private readonly List<User> _users = new List<User>(); //temporary private readonly List<User> _users = new List<User>(); //temporary
private bool stream;
public bool streamOn()
{
return stream;
}
public Task AddUser(string connectionId, string userName, ConnectionType type) public Task AddUser(string connectionId, string userName, ConnectionType type)
{ {
_users.Add(new User(connectionId, userName, type)); _users.Add(new User(connectionId, userName, type));

View File

@ -24,7 +24,7 @@ namespace Squirrowse.Service
{ {
services.AddControllers(); services.AddControllers();
services.AddMediatR(Assembly.GetAssembly(typeof(Startup))); services.AddMediatR(Assembly.GetAssembly(typeof(Startup)));
services.AddSingleton<IStreamHub, StreamHub>(); //services.AddSingleton<IStreamHub, StreamHub>();
services.AddSingleton<IStreamManager, StreamManager>(); services.AddSingleton<IStreamManager, StreamManager>();
services.AddCoreModule(); services.AddCoreModule();
services.AddSignalR() services.AddSignalR()

View File

@ -25,7 +25,7 @@
View cast View cast
</button> </button>
<button id="StopViewCast" disabled="@(!IsViewingCastOf(agent.AgentName))" class="btn btn-warning btn-sm" @onclick="@(() => OnStopViewCastClicked(agent.ConnectionId))"> <button id="StopViewCast" disabled="@(IsViewingCastOf(agent.AgentName))" class="btn btn-warning btn-sm" @onclick="@(() => OnStopViewCastClicked(agent.ConnectionId))">
Stop cast Stop cast
</button> </button>
</div> </div>
@ -56,7 +56,6 @@
{ {
await _connection.InitConnection(ConnectionType.Server); await _connection.InitConnection(ConnectionType.Server);
connection = await _connection.GetConnection(); connection = await _connection.GetConnection();
connection.On<string>("DUPA", DuPa);
connection.On<byte[]>("RecData", OnStreamDataReceived); connection.On<byte[]>("RecData", OnStreamDataReceived);
await foreach (var user in connection.StreamAsync<User>("GetListOfTypeUserAsync", ConnectionType.Client).ConfigureAwait(false)) await foreach (var user in connection.StreamAsync<User>("GetListOfTypeUserAsync", ConnectionType.Client).ConfigureAwait(false))
@ -73,12 +72,22 @@
return agentName == CurrentViewCastAgent; return agentName == CurrentViewCastAgent;
} }
public void DuPa(string duuupa)
async Task OnStreamDataReceived(byte[] streamData)
{ {
var s= "DuPa"; //await foreach (var t in streamData)
//{
// var base64 = Convert.ToBase64String(t);
// imageSource = String.Format("data:image/jpg;base64,{0}", base64);
// StateHasChanged();
//}
var base64 = Convert.ToBase64String(streamData);
imageSource = String.Format("data:image/jpg;base64,{0}", base64);
StateHasChanged();
} }
async Task OnStreamDataReceived(byte[] streamData) async Task OnStreamDataReceivedEn(byte[] streamData)
{ {
//await foreach (var t in streamData) //await foreach (var t in streamData)
//{ //{
@ -103,7 +112,7 @@
private async Task OnStopViewCastClicked(string agentName) private async Task OnStopViewCastClicked(string agentName)
{ {
CurrentViewCastAgent = null; CurrentViewCastAgent = null;
await connection.InvokeAsync("Stopstream", agentName); await connection.InvokeAsync("StopStream", agentName);
imageSource = null; imageSource = null;
StateHasChanged(); StateHasChanged();
} }